wilfred-s commented on code in PR #413:
URL: https://github.com/apache/yunikorn-core/pull/413#discussion_r891838139
##########
go.mod:
##########
@@ -23,7 +23,7 @@ go 1.16
require (
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
- github.com/apache/yunikorn-scheduler-interface
v0.0.0-20220413101040-d6ba6ec504f9
+ github.com/apache/yunikorn-scheduler-interface
v1.0.1-0.20220602100449-8c6c3fab407c
Review Comment:
Please check the doc on updating go.mod, this needs to be a pseudo version
##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
alloc := newReservedAllocation(Unreserved,
reserve.nodeID, unreserveAsk)
return alloc
}
- // check if this fits in the queue's head room
- if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
- continue
- }
- // check allocation possibility
- alloc := sa.tryNode(reserve.node, ask)
- // allocation worked fix the result and return
- if alloc != nil {
- alloc.Result = AllocatedReserved
- return alloc
+
+ // Is it daemon set?
+ if ask.GetRequiredNode() == "" {
+ // check if this fits in the queue's head room
+ if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+ continue
+ }
+
+ // check allocation possibility
+ alloc := sa.tryNode(reserve.node, ask)
+
+ // allocation worked fix the result and return
+ if alloc != nil {
+ alloc.Result = AllocatedReserved
+ return alloc
+ }
+ } else if
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {
+ log.Logger().Info("Triggering preemption process for
daemon set ask",
+ zap.String("ds allocation key",
ask.AllocationKey))
+
+ // try preemption and see if we can free up resource
+ preemptor := NewSimplePreemptor(reserve.node)
+
+ // Are there any victims/asks to preempt?
+ victims := preemptor.GetVictims(ask)
+ if len(victims) > 0 {
+ log.Logger().Info("Found victims for daemon set
ask preemption ",
+ zap.String("ds allocation key",
ask.AllocationKey))
+ zap.Int("no.of victims", len(victims))
+ sa.notifyRMAllocationAskReleased(sa.rmID,
victims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
Review Comment:
We release the allocation not the ask here.
NOTE: Need to file a follow up jira to track the ask in the allocation and
vice versa that is triggering the preemption
##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "sort"
+ "sync"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+ Node *Node
+ allocations []*AllocationAsk
+
+ sync.RWMutex
+}
+
+func NewSimplePreemptor(node *Node) *PreemptionContext {
+ preemptor := &PreemptionContext{
+ Node: node,
+ }
+ preemptor.filterAllocations()
+ preemptor.sortAllocations()
+ return preemptor
+}
+
+func (p *PreemptionContext) filterAllocations() {
+ p.Lock()
+ defer p.Unlock()
+ for _, allocation := range p.Node.GetAllAllocations() {
+ // skip daemon set pods
+ if allocation.Ask.GetRequiredNode() != "" {
+ continue
+ }
+ p.allocations = append(p.allocations, allocation.Ask)
Review Comment:
Allocations not Asks
##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
alloc := newReservedAllocation(Unreserved,
reserve.nodeID, unreserveAsk)
return alloc
}
- // check if this fits in the queue's head room
- if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
- continue
- }
- // check allocation possibility
- alloc := sa.tryNode(reserve.node, ask)
- // allocation worked fix the result and return
- if alloc != nil {
- alloc.Result = AllocatedReserved
- return alloc
+
+ // Is it daemon set?
+ if ask.GetRequiredNode() == "" {
+ // check if this fits in the queue's head room
+ if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+ continue
+ }
Review Comment:
We need to do this check even if it is a daemon set pod.
If we don't we could go over the queue quota.
##########
pkg/scheduler/objects/application_test.go:
##########
@@ -1045,18 +1046,18 @@ func TestGetTag(t *testing.T) {
}
func TestOnStatusChangeCalled(t *testing.T) {
- app, testHandler := newApplicationWithHandler(appID1, "default",
"root.a")
+ app, testHandler := NewApplicationWithHandler(appID1, "default",
"root.a")
Review Comment:
Why the change to an exported function?
##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
alloc := newReservedAllocation(Unreserved,
reserve.nodeID, unreserveAsk)
return alloc
}
- // check if this fits in the queue's head room
- if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
- continue
- }
- // check allocation possibility
- alloc := sa.tryNode(reserve.node, ask)
- // allocation worked fix the result and return
- if alloc != nil {
- alloc.Result = AllocatedReserved
- return alloc
+
+ // Is it daemon set?
+ if ask.GetRequiredNode() == "" {
+ // check if this fits in the queue's head room
+ if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+ continue
+ }
+
+ // check allocation possibility
+ alloc := sa.tryNode(reserve.node, ask)
+
+ // allocation worked fix the result and return
+ if alloc != nil {
+ alloc.Result = AllocatedReserved
+ return alloc
+ }
+ } else if
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {
+ log.Logger().Info("Triggering preemption process for
daemon set ask",
+ zap.String("ds allocation key",
ask.AllocationKey))
+
+ // try preemption and see if we can free up resource
+ preemptor := NewSimplePreemptor(reserve.node)
+
+ // Are there any victims/asks to preempt?
+ victims := preemptor.GetVictims(ask)
+ if len(victims) > 0 {
+ log.Logger().Info("Found victims for daemon set
ask preemption ",
+ zap.String("ds allocation key",
ask.AllocationKey))
+ zap.Int("no.of victims", len(victims))
+ sa.notifyRMAllocationAskReleased(sa.rmID,
victims, si.TerminationType_PREEMPTED_BY_SCHEDULER,
+ "preempting asks to free up resources
to run daemon set ask: "+ask.AllocationKey)
+ } else {
+ continue
+ }
}
}
+
// lets try this on all other nodes
for _, reserve := range sa.reservations {
iterator := nodeIterator()
if iterator != nil {
- alloc := sa.tryNodesNoReserve(reserve.ask, iterator,
reserve.nodeID)
- // have a candidate return it, including the node that
was reserved
- if alloc != nil {
- return alloc
+ // Other nodes cannot be tried for daemon set asks
+ if reserve.ask.GetRequiredNode() == "" {
+ alloc := sa.tryNodesNoReserve(reserve.ask,
iterator, reserve.nodeID)
+ // have a candidate return it, including the
node that was reserved
+ if alloc != nil {
+ return alloc
+ }
Review Comment:
See comment above, reverse the check at the start and this is not needed.
performance gain by not iterating nodes unneeded.
##########
pkg/scheduler/objects/application.go:
##########
@@ -863,11 +863,10 @@ func (sa *Application) tryAllocate(headRoom
*resources.Resource, nodeIterator fu
zap.String("allocation key",
request.AllocationKey))
return alloc
}
- if err := sa.reserveInternal(node, request); err != nil
{
- log.Logger().Warn("Failed to reserve the
required node",
- zap.String("required node",
node.NodeID),
- zap.String("allocation key",
request.AllocationKey),
- zap.String("reason", err.Error()))
+
+ alloc = newReservedAllocation(Reserved, node.NodeID,
request)
+ if alloc != nil {
+ return alloc
}
continue
Review Comment:
`newReservedAllocation` will always return an allocation. This can be
simplified to just:
```
return newReservedAllocation(Reserved, node.NodeID, request)
```
##########
pkg/rmproxy/rmproxy_mock.go:
##########
@@ -0,0 +1,77 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package rmproxy
+
+import (
+ "sync"
+
+ "github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
+)
+
+// implements RMProxy Event Handler
+// Resetting RM mock handler for the testing
+type MockedRMProxy struct {
+ handled bool
+ events []interface{}
+ sync.RWMutex
+}
+
+func NewMockedRMProxy() *MockedRMProxy {
+ return &MockedRMProxy{}
+}
+
+// handle the RM update event
Review Comment:
Nit: we should use Go standard comments now that we're moving it to its own
file we should fix it:
// HandleEvent implements event handling for a limited set of events for
testing
##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
alloc := newReservedAllocation(Unreserved,
reserve.nodeID, unreserveAsk)
return alloc
}
- // check if this fits in the queue's head room
- if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
- continue
- }
- // check allocation possibility
- alloc := sa.tryNode(reserve.node, ask)
- // allocation worked fix the result and return
- if alloc != nil {
- alloc.Result = AllocatedReserved
- return alloc
+
+ // Is it daemon set?
+ if ask.GetRequiredNode() == "" {
+ // check if this fits in the queue's head room
+ if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+ continue
+ }
+
+ // check allocation possibility
+ alloc := sa.tryNode(reserve.node, ask)
+
+ // allocation worked fix the result and return
+ if alloc != nil {
+ alloc.Result = AllocatedReserved
+ return alloc
+ }
+ } else if
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {
Review Comment:
`reserve.node.GetCapacity().FitInMaxUndef()` is the same as
`reserve.node.FitIn()` which is executed under a lock and makes sure nothing
changes while we check.
However I think this needs to be based on `node.GetAvailableResource()`.
##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "sort"
+ "sync"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+ Node *Node
+ allocations []*AllocationAsk
+
+ sync.RWMutex
+}
+
+func NewSimplePreemptor(node *Node) *PreemptionContext {
+ preemptor := &PreemptionContext{
+ Node: node,
+ }
+ preemptor.filterAllocations()
+ preemptor.sortAllocations()
+ return preemptor
+}
+
+func (p *PreemptionContext) filterAllocations() {
+ p.Lock()
+ defer p.Unlock()
+ for _, allocation := range p.Node.GetAllAllocations() {
+ // skip daemon set pods
+ if allocation.Ask.GetRequiredNode() != "" {
Review Comment:
We should pull this information into the allocation as a boolean flag.
Simplifies the code and allows the preemption context to use allocations and
not asks
##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "sort"
+ "sync"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+ Node *Node
+ allocations []*AllocationAsk
+
+ sync.RWMutex
+}
Review Comment:
Do we need a context here or can we just pass in a node + resource size and
handle it internally?
##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
alloc := newReservedAllocation(Unreserved,
reserve.nodeID, unreserveAsk)
return alloc
}
- // check if this fits in the queue's head room
- if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
- continue
- }
- // check allocation possibility
- alloc := sa.tryNode(reserve.node, ask)
- // allocation worked fix the result and return
- if alloc != nil {
- alloc.Result = AllocatedReserved
- return alloc
+
+ // Is it daemon set?
+ if ask.GetRequiredNode() == "" {
Review Comment:
We should reverse this check:
```
if ask.GetRequiredNode() != "" {
do daemon set work (separate function?)
return when done
}
do all non daemon set work
```
##########
pkg/scheduler/objects/application.go:
##########
@@ -1050,26 +1049,54 @@ func (sa *Application) tryReservedAllocate(headRoom
*resources.Resource, nodeIte
alloc := newReservedAllocation(Unreserved,
reserve.nodeID, unreserveAsk)
return alloc
}
- // check if this fits in the queue's head room
- if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
- continue
- }
- // check allocation possibility
- alloc := sa.tryNode(reserve.node, ask)
- // allocation worked fix the result and return
- if alloc != nil {
- alloc.Result = AllocatedReserved
- return alloc
+
+ // Is it daemon set?
+ if ask.GetRequiredNode() == "" {
+ // check if this fits in the queue's head room
+ if !headRoom.FitInMaxUndef(ask.AllocatedResource) {
+ continue
+ }
+
+ // check allocation possibility
+ alloc := sa.tryNode(reserve.node, ask)
+
+ // allocation worked fix the result and return
+ if alloc != nil {
+ alloc.Result = AllocatedReserved
+ return alloc
+ }
+ } else if
reserve.node.GetCapacity().FitInMaxUndef(ask.GetAllocatedResource()) {
+ log.Logger().Info("Triggering preemption process for
daemon set ask",
+ zap.String("ds allocation key",
ask.AllocationKey))
+
+ // try preemption and see if we can free up resource
+ preemptor := NewSimplePreemptor(reserve.node)
+
+ // Are there any victims/asks to preempt?
+ victims := preemptor.GetVictims(ask)
Review Comment:
We only need the `AllocatedResource` size of the ask, we work on just one
repeat always.
##########
pkg/scheduler/objects/utilities_test.go:
##########
@@ -106,11 +105,11 @@ func newApplicationWithTags(appID, partition, queueName
string, tags map[string]
return NewApplication(siApp, user, nil, "")
}
-func newApplicationWithHandler(appID, partition, queueName string)
(*Application, *appEventHandler) {
+func NewApplicationWithHandler(appID, partition, queueName string)
(*Application, *rmproxy.MockedRMProxy) {
Review Comment:
No need to export
##########
pkg/scheduler/objects/simplepreemptor.go:
##########
@@ -0,0 +1,149 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package objects
+
+import (
+ "sort"
+ "sync"
+
+ "github.com/apache/yunikorn-core/pkg/common/resources"
+)
+
+type PreemptionContext struct {
+ Node *Node
+ allocations []*AllocationAsk
+
+ sync.RWMutex
+}
+
+func NewSimplePreemptor(node *Node) *PreemptionContext {
+ preemptor := &PreemptionContext{
+ Node: node,
+ }
+ preemptor.filterAllocations()
+ preemptor.sortAllocations()
+ return preemptor
+}
+
+func (p *PreemptionContext) filterAllocations() {
+ p.Lock()
+ defer p.Unlock()
+ for _, allocation := range p.Node.GetAllAllocations() {
+ // skip daemon set pods
+ if allocation.Ask.GetRequiredNode() != "" {
+ continue
+ }
+ p.allocations = append(p.allocations, allocation.Ask)
+ }
+}
+
+// sort based on the following criteria in the specified order:
+// 1. By type (regular pods, opted out pods, driver/owner pods),
+// 2. By priority (least priority ask placed first),
+// 3. By Create time or age of the ask (younger ask placed first),
+// 4. By resource (ask with lesser allocated resources placed first)
+func (p *PreemptionContext) sortAllocations() {
+ p.Lock()
+ defer p.Unlock()
+ sort.SliceStable(p.allocations, func(i, j int) bool {
+ l := p.allocations[i]
+ r := p.allocations[j]
+
+ // sort based on the type
+ lAskType := 1 // regular pod
+ if l.IsOriginator() { // driver/owner pod
+ lAskType = 3
+ } else if !l.GetAllowPreemption() { // opted out pod
+ lAskType = 2
+ }
+ rAskType := 1
+ if r.IsOriginator() {
+ rAskType = 3
+ } else if !r.GetAllowPreemption() {
+ rAskType = 2
+ }
+ if lAskType < rAskType {
+ return true
+ }
+ if lAskType > rAskType {
+ return false
+ }
+
+ // sort based on the priority
+ lPriority := l.GetPriority()
+ rPriority := r.GetPriority()
+ if lPriority < rPriority {
+ return true
+ }
+ if lPriority > rPriority {
+ return false
+ }
+
+ // sort based on the age
+ if !l.GetCreateTime().Equal(r.GetCreateTime()) {
+ return l.GetCreateTime().After(r.GetCreateTime())
+ }
+
+ // sort based on the allocated resource
+ lResource := l.GetAllocatedResource()
+ rResource := r.GetAllocatedResource()
+ if !resources.Equals(lResource, rResource) {
+ delta := resources.Sub(lResource, rResource)
+ return !resources.StrictlyGreaterThanZero(delta)
+ }
+ return true
+ })
+}
+
+func (p *PreemptionContext) GetVictims(sourceAsk *AllocationAsk)
[]*AllocationAsk {
Review Comment:
This should return Allocation objects not ask objects: we pre-empt
allocations not asks. The input should be the required resource not the full ask
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]