wilfred-s commented on code in PR #413:
URL: https://github.com/apache/yunikorn-core/pull/413#discussion_r891838139


##########
go.mod:
##########
@@ -23,7 +23,7 @@ go 1.16
 
 require (
        github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
-       github.com/apache/yunikorn-scheduler-interface 
v0.0.0-20220413101040-d6ba6ec504f9
+       github.com/apache/yunikorn-scheduler-interface 
v1.0.1-0.20220602100449-8c6c3fab407c

Review Comment:
   Please check the doc on updating go.mod, this needs to be a pseudo version



##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        alloc := newReservedAllocation(Unreserved, 
reserve.nodeID, unreserveAsk)
                        return alloc
                }
-               // check if this fits in the queue's head room
-               if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
-                       continue
-               }
-               // check allocation possibility
-               alloc := sa.tryNode(reserve.node, ask)
-               // allocation worked fix the result and return
-               if alloc != nil {
-                       alloc.Result = AllocatedReserved
-                       return alloc
+
+               // Is it daemon set?
+               if ask.GetRequiredNode() == "" {
+                       // check if this fits in the queue's head room
+                       if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+                               continue
+                       }
+
+                       // check allocation possibility
+                       alloc := sa.tryNode(reserve.node, ask)
+
+                       // allocation worked fix the result and return
+                       if alloc != nil {
+                               alloc.Result = AllocatedReserved
+                               return alloc
+                       }
+               } else if 
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {
+                       log.Logger().Info("Triggering preemption process for 
daemon set ask",
+                               zap.String("ds allocation key", 
ask.AllocationKey))
+
+                       // try preemption and see if we can free up resource
+                       preemptor := NewSimplePreemptor(reserve.node)
+
+                       // Are there any victims/asks to preempt?
+                       victims := preemptor.GetVictims(ask)
+                       if len(victims) > 0 {
+                               log.Logger().Info("Found victims for daemon set 
ask preemption ",
+                                       zap.String("ds allocation key", 
ask.AllocationKey))
+                               zap.Int("no.of victims", len(victims))
+                               sa.notifyRMAllocationAskReleased(sa.rmID, 
victims, si.TerminationType_PREEMPTED_BY_SCHEDULER,

Review Comment:
   We release the allocation not the ask here.
   
   NOTE: Need to file a follow up jira to track the ask in the allocation and 
vice versa that is triggering the preemption



##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "sort"
+       "sync"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+       Node        *Node
+       allocations []*AllocationAsk
+
+       sync.RWMutex
+}
+
+func NewSimplePreemptor(node *Node) *PreemptionContext {
+       preemptor := &PreemptionContext{
+               Node: node,
+       }
+       preemptor.filterAllocations()
+       preemptor.sortAllocations()
+       return preemptor
+}
+
+func (p *PreemptionContext) filterAllocations() {
+       p.Lock()
+       defer p.Unlock()
+       for _, allocation := range p.Node.GetAllAllocations() {
+               // skip daemon set pods
+               if allocation.Ask.GetRequiredNode() != "" {
+                       continue
+               }
+               p.allocations = append(p.allocations, allocation.Ask)

Review Comment:
   Allocations not Asks



##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        alloc := newReservedAllocation(Unreserved, 
reserve.nodeID, unreserveAsk)
                        return alloc
                }
-               // check if this fits in the queue's head room
-               if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
-                       continue
-               }
-               // check allocation possibility
-               alloc := sa.tryNode(reserve.node, ask)
-               // allocation worked fix the result and return
-               if alloc != nil {
-                       alloc.Result = AllocatedReserved
-                       return alloc
+
+               // Is it daemon set?
+               if ask.GetRequiredNode() == "" {
+                       // check if this fits in the queue's head room
+                       if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+                               continue
+                       }

Review Comment:
   We need to do this check even if it is a daemon set pod.
   If we don't we could go over the queue quota.



##########
pkg/scheduler/objects/application_test.go:
##########
@@ -1045,18 +1046,18 @@ func TestGetTag(t *testing.T) {
 }
 
 func TestOnStatusChangeCalled(t *testing.T) {
-       app, testHandler := newApplicationWithHandler(appID1, "default", 
"root.a")
+       app, testHandler := NewApplicationWithHandler(appID1, "default", 
"root.a")

Review Comment:
   Why the change to an exported function?



##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        alloc := newReservedAllocation(Unreserved, 
reserve.nodeID, unreserveAsk)
                        return alloc
                }
-               // check if this fits in the queue's head room
-               if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
-                       continue
-               }
-               // check allocation possibility
-               alloc := sa.tryNode(reserve.node, ask)
-               // allocation worked fix the result and return
-               if alloc != nil {
-                       alloc.Result = AllocatedReserved
-                       return alloc
+
+               // Is it daemon set?
+               if ask.GetRequiredNode() == "" {
+                       // check if this fits in the queue's head room
+                       if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+                               continue
+                       }
+
+                       // check allocation possibility
+                       alloc := sa.tryNode(reserve.node, ask)
+
+                       // allocation worked fix the result and return
+                       if alloc != nil {
+                               alloc.Result = AllocatedReserved
+                               return alloc
+                       }
+               } else if 
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {
+                       log.Logger().Info("Triggering preemption process for 
daemon set ask",
+                               zap.String("ds allocation key", 
ask.AllocationKey))
+
+                       // try preemption and see if we can free up resource
+                       preemptor := NewSimplePreemptor(reserve.node)
+
+                       // Are there any victims/asks to preempt?
+                       victims := preemptor.GetVictims(ask)
+                       if len(victims) > 0 {
+                               log.Logger().Info("Found victims for daemon set 
ask preemption ",
+                                       zap.String("ds allocation key", 
ask.AllocationKey))
+                               zap.Int("no.of victims", len(victims))
+                               sa.notifyRMAllocationAskReleased(sa.rmID, 
victims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
+                                       "preempting asks to free up resources 
to run daemon set ask: "+ask.AllocationKey)
+                       } else {
+                               continue
+                       }
                }
        }
+
        // lets try this on all other nodes
        for _, reserve := range sa.reservations {
                iterator := nodeIterator()
                if iterator != nil {
-                       alloc := sa.tryNodesNoReserve(reserve.ask, iterator, 
reserve.nodeID)
-                       // have a candidate return it, including the node that 
was reserved
-                       if alloc != nil {
-                               return alloc
+                       // Other nodes cannot be tried for daemon set asks
+                       if reserve.ask.GetRequiredNode() == "" {
+                               alloc := sa.tryNodesNoReserve(reserve.ask, 
iterator, reserve.nodeID)
+                               // have a candidate return it, including the 
node that was reserved
+                               if alloc != nil {
+                                       return alloc
+                               }

Review Comment:
   See comment above, reverse the check at the start and this is not needed. 
performance gain by not iterating nodes unneeded.



##########
pkg/scheduler/objects/application.go:
##########
@@ -863,11 +863,10 @@ func (sa *Application) tryAllocate(headRoom 
*resources.Resource, nodeIterator fu
                                        zap.String("allocation key", 
request.AllocationKey))
                                return alloc
                        }
-                       if err := sa.reserveInternal(node, request); err != nil 
{
-                               log.Logger().Warn("Failed to reserve the 
required node",
-                                       zap.String("required node", 
node.NodeID),
-                                       zap.String("allocation key", 
request.AllocationKey),
-                                       zap.String("reason", err.Error()))
+
+                       alloc = newReservedAllocation(Reserved, node.NodeID, 
request)
+                       if alloc != nil {
+                               return alloc
                        }
                        continue

Review Comment:
   `newReservedAllocation` will always return an allocation. This can be 
simplified to just:
   ```
   return newReservedAllocation(Reserved, node.NodeID, request)
   ```



##########
pkg/rmproxy/rmproxy_mock.go:
##########
@@ -0,0 +1,77 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package rmproxy
+
+import (
+       "sync"
+
+       "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
+)
+
+// implements RMProxy Event Handler
+// Resetting RM mock handler for the testing
+type MockedRMProxy struct {
+       handled bool
+       events  []interface{}
+       sync.RWMutex
+}
+
+func NewMockedRMProxy() *MockedRMProxy {
+       return &MockedRMProxy{}
+}
+
+// handle the RM update event

Review Comment:
   Nit: we should use Go standard comments now that we're moving it to its own 
file we should fix it:
   // HandleEvent implements event handling for a limited set of events for 
testing 



##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        alloc := newReservedAllocation(Unreserved, 
reserve.nodeID, unreserveAsk)
                        return alloc
                }
-               // check if this fits in the queue's head room
-               if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
-                       continue
-               }
-               // check allocation possibility
-               alloc := sa.tryNode(reserve.node, ask)
-               // allocation worked fix the result and return
-               if alloc != nil {
-                       alloc.Result = AllocatedReserved
-                       return alloc
+
+               // Is it daemon set?
+               if ask.GetRequiredNode() == "" {
+                       // check if this fits in the queue's head room
+                       if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+                               continue
+                       }
+
+                       // check allocation possibility
+                       alloc := sa.tryNode(reserve.node, ask)
+
+                       // allocation worked fix the result and return
+                       if alloc != nil {
+                               alloc.Result = AllocatedReserved
+                               return alloc
+                       }
+               } else if 
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {

Review Comment:
   `reserve.node.GetCapacity().FitInMaxUndef()` is the same as 
`reserve.node.FitIn()` which is executed under a lock and makes sure nothing 
changes while we check.
   However I think this needs to be based on `node.GetAvailableResource()`.



##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "sort"
+       "sync"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+       Node        *Node
+       allocations []*AllocationAsk
+
+       sync.RWMutex
+}
+
+func NewSimplePreemptor(node *Node) *PreemptionContext {
+       preemptor := &PreemptionContext{
+               Node: node,
+       }
+       preemptor.filterAllocations()
+       preemptor.sortAllocations()
+       return preemptor
+}
+
+func (p *PreemptionContext) filterAllocations() {
+       p.Lock()
+       defer p.Unlock()
+       for _, allocation := range p.Node.GetAllAllocations() {
+               // skip daemon set pods
+               if allocation.Ask.GetRequiredNode() != "" {

Review Comment:
   We should pull this information into the allocation as a boolean flag. 
Simplifies the code and allows the preemption context to use allocations and 
not asks



##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "sort"
+       "sync"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+       Node        *Node
+       allocations []*AllocationAsk
+
+       sync.RWMutex
+}

Review Comment:
   Do we need a context here or can we just pass in a node + resource size and 
handle it internally?



##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        alloc := newReservedAllocation(Unreserved, 
reserve.nodeID, unreserveAsk)
                        return alloc
                }
-               // check if this fits in the queue's head room
-               if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
-                       continue
-               }
-               // check allocation possibility
-               alloc := sa.tryNode(reserve.node, ask)
-               // allocation worked fix the result and return
-               if alloc != nil {
-                       alloc.Result = AllocatedReserved
-                       return alloc
+
+               // Is it daemon set?
+               if ask.GetRequiredNode() == "" {

Review Comment:
   We should reverse this check:
   ```
   if ask.GetRequiredNode() != "" {
       do daemon set work (separate function?)
       return when done
   }
   do all non daemon set work
   ```
   



##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom 
*resources.Resource, nodeIte
                        alloc := newReservedAllocation(Unreserved, 
reserve.nodeID, unreserveAsk)
                        return alloc
                }
-               // check if this fits in the queue's head room
-               if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
-                       continue
-               }
-               // check allocation possibility
-               alloc := sa.tryNode(reserve.node, ask)
-               // allocation worked fix the result and return
-               if alloc != nil {
-                       alloc.Result = AllocatedReserved
-                       return alloc
+
+               // Is it daemon set?
+               if ask.GetRequiredNode() == "" {
+                       // check if this fits in the queue's head room
+                       if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+                               continue
+                       }
+
+                       // check allocation possibility
+                       alloc := sa.tryNode(reserve.node, ask)
+
+                       // allocation worked fix the result and return
+                       if alloc != nil {
+                               alloc.Result = AllocatedReserved
+                               return alloc
+                       }
+               } else if 
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {
+                       log.Logger().Info("Triggering preemption process for 
daemon set ask",
+                               zap.String("ds allocation key", 
ask.AllocationKey))
+
+                       // try preemption and see if we can free up resource
+                       preemptor := NewSimplePreemptor(reserve.node)
+
+                       // Are there any victims/asks to preempt?
+                       victims := preemptor.GetVictims(ask)

Review Comment:
   We only need the `AllocatedResource` size of the ask, we work on just one 
repeat always.



##########
pkg/scheduler/objects/utilities_test.go:
##########
@@ -106,11 +105,11 @@ func newApplicationWithTags(appID, partition, queueName 
string, tags map[string]
        return NewApplication(siApp, user, nil, "")
 }
 
-func newApplicationWithHandler(appID, partition, queueName string) 
(*Application, *appEventHandler) {
+func NewApplicationWithHandler(appID, partition, queueName string) 
(*Application, *rmproxy.MockedRMProxy) {

Review Comment:
   No need to export



##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+       "sort"
+       "sync"
+
+       "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+       Node        *Node
+       allocations []*AllocationAsk
+
+       sync.RWMutex
+}
+
+func NewSimplePreemptor(node *Node) *PreemptionContext {
+       preemptor := &PreemptionContext{
+               Node: node,
+       }
+       preemptor.filterAllocations()
+       preemptor.sortAllocations()
+       return preemptor
+}
+
+func (p *PreemptionContext) filterAllocations() {
+       p.Lock()
+       defer p.Unlock()
+       for _, allocation := range p.Node.GetAllAllocations() {
+               // skip daemon set pods
+               if allocation.Ask.GetRequiredNode() != "" {
+                       continue
+               }
+               p.allocations = append(p.allocations, allocation.Ask)
+       }
+}
+
+// sort based on the following criteria in the specified order:
+// 1. By type (regular pods, opted out pods, driver/owner pods),
+// 2. By priority (least priority ask placed first),
+// 3. By Create time or age of the ask (younger ask placed first),
+// 4. By resource (ask with lesser allocated resources placed first)
+func (p *PreemptionContext) sortAllocations() {
+       p.Lock()
+       defer p.Unlock()
+       sort.SliceStable(p.allocations, func(i, j int) bool {
+               l := p.allocations[i]
+               r := p.allocations[j]
+
+               // sort based on the type
+               lAskType := 1         // regular pod
+               if l.IsOriginator() { // driver/owner pod
+                       lAskType = 3
+               } else if !l.GetAllowPreemption() { // opted out pod
+                       lAskType = 2
+               }
+               rAskType := 1
+               if r.IsOriginator() {
+                       rAskType = 3
+               } else if !r.GetAllowPreemption() {
+                       rAskType = 2
+               }
+               if lAskType < rAskType {
+                       return true
+               }
+               if lAskType > rAskType {
+                       return false
+               }
+
+               // sort based on the priority
+               lPriority := l.GetPriority()
+               rPriority := r.GetPriority()
+               if lPriority < rPriority {
+                       return true
+               }
+               if lPriority > rPriority {
+                       return false
+               }
+
+               // sort based on the age
+               if !l.GetCreateTime().Equal(r.GetCreateTime()) {
+                       return l.GetCreateTime().After(r.GetCreateTime())
+               }
+
+               // sort based on the allocated resource
+               lResource := l.GetAllocatedResource()
+               rResource := r.GetAllocatedResource()
+               if !resources.Equals(lResource, rResource) {
+                       delta := resources.Sub(lResource, rResource)
+                       return !resources.StrictlyGreaterThanZero(delta)
+               }
+               return true
+       })
+}
+
+func (p *PreemptionContext) GetVictims(sourceAsk *AllocationAsk) 
[]*AllocationAsk {

Review Comment:
   This should return Allocation objects not ask objects: we pre-empt 
allocations not asks. The input should be the required resource not the full ask



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to