wilfred-s commented on a change in pull request #332:
URL:
https://github.com/apache/incubator-yunikorn-core/pull/332#discussion_r741613094
##########
File path: pkg/plugins/types.go
##########
@@ -21,49 +21,11 @@ package plugins
import (
"sync"
- "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+ "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/api"
)
type SchedulerPlugins struct {
- predicatesPlugin PredicatesPlugin
- reconcilePlugin ReconcilePlugin
- eventPlugin EventPlugin
- schedulingStateUpdater ContainerSchedulingStateUpdater
- configPlugin ConfigurationPlugin
+ ResourceManagerCallbackPlugin api.ResourceManagerCallback
sync.RWMutex
-}
-
-// RM side implements this API when it can provide plugin for predicates.
-type PredicatesPlugin interface {
- // Run a certain set of predicate functions to determine if a proposed
allocation
- // can be allocated onto a node.
- Predicates(args *si.PredicatesArgs) error
-}
-
-type ReconcilePlugin interface {
- // RM side implements this API when it can provide plugin for
reconciling
- // Re-sync scheduler cache can sync some in-cache (yunikorn-core side)
state changes
- // to scheduler cache (shim-side), such as assumed allocations.
- ReSyncSchedulerCache(args *si.ReSyncSchedulerCacheArgs) error
-}
-
-type EventPlugin interface {
- // This plugin is responsible for transmitting events to the shim side.
- // Events can be further exposed from the shim.
- SendEvent(events []*si.EventRecord)
-}
-
-// Scheduler core can update container scheduling state to the RM,
-// the shim side can determine what to do incorporate with the scheduling state
-type ContainerSchedulingStateUpdater interface {
- // update container scheduling state to the shim side
- // this might be called even the container scheduling state is unchanged
- // the shim side cannot assume to only receive updates on state changes
- // the shim side implementation must be thread safe
- Update(request *si.UpdateContainerSchedulingStateRequest)
-}
-
-type ConfigurationPlugin interface {
- UpdateConfiguration(args *si.UpdateConfigurationRequest)
*si.UpdateConfigurationResponse
-}
+}
Review comment:
nit: missing newline
##########
File path: pkg/scheduler/context.go
##########
@@ -201,31 +201,106 @@ func (cc *ClusterContext)
processRMConfigUpdateEvent(event *rmevent.RMConfigUpda
configs.ConfigContext.Set(cc.policyGroup, conf)
}
-// Main update processing: the RM passes a large multi part update which needs
to be unravelled.
-// Order of following operations is fixed, don't change unless carefully
thought through.
-// 1) applications
-// 2) allocations on existing applications
-// 3) nodes
-// Updating allocations on existing applications requires the application to
exist.
-// Node updates include recovered nodes which are linked to applications that
must exist.
-func (cc *ClusterContext) processRMUpdateEvent(event
*rmevent.RMUpdateRequestEvent) {
+func (cc *ClusterContext) handleRMUpdateNodeEvent(event
*rmevent.RMUpdateNodeEvent) {
request := event.Request
- // 1) Add / remove app requested by RM.
- cc.processApplications(request)
- // 2) Add new request, release allocation, cancel request
- cc.processAllocations(request)
- // 3) Add / remove / update Nodes
cc.processNodes(request)
}
-func (cc *ClusterContext) processNodes(request *si.UpdateRequest) {
- // 3) Add / remove / update Nodes
- // Process add node
- if len(request.NewSchedulableNodes) > 0 {
- cc.addNodes(request)
- }
- if len(request.UpdatedNodes) > 0 {
- cc.updateNodes(request)
+func (cc *ClusterContext) processNodes(request *si.NodeRequest) {
+ // 1) Add / remove / update Nodes
+ if len(request.GetNodes()) > 0 {
+ acceptedNodes := make([]*si.AcceptedNode, 0)
+ rejectedNodes := make([]*si.RejectedNode, 0)
+ for _, nodeInfo := range request.GetNodes() {
+ switch nodeInfo.Action {
+ case si.NodeInfo_CREATE:
+ sn := objects.NewNode(nodeInfo)
Review comment:
Can we move this create into its own method? Similar to the way
`updateNode` is called for all update cases? Makes the code more readable:
```
// addNode adds a node to the cluster returns error if the action fails or
nil on success
func (cc *ClusterContext) addNode(nodeInfo *si.NodeInfo) error {
}
```
If the response is a an error the error message can be used in the reject
state otherwise it is an accepted node.
##########
File path: pkg/plugins/types.go
##########
@@ -21,49 +21,11 @@ package plugins
import (
"sync"
- "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/si"
+ "github.com/apache/incubator-yunikorn-scheduler-interface/lib/go/api"
)
type SchedulerPlugins struct {
- predicatesPlugin PredicatesPlugin
- reconcilePlugin ReconcilePlugin
- eventPlugin EventPlugin
- schedulingStateUpdater ContainerSchedulingStateUpdater
- configPlugin ConfigurationPlugin
+ ResourceManagerCallbackPlugin api.ResourceManagerCallback
sync.RWMutex
-}
-
-// RM side implements this API when it can provide plugin for predicates.
-type PredicatesPlugin interface {
- // Run a certain set of predicate functions to determine if a proposed
allocation
- // can be allocated onto a node.
- Predicates(args *si.PredicatesArgs) error
-}
-
-type ReconcilePlugin interface {
- // RM side implements this API when it can provide plugin for
reconciling
- // Re-sync scheduler cache can sync some in-cache (yunikorn-core side)
state changes
- // to scheduler cache (shim-side), such as assumed allocations.
- ReSyncSchedulerCache(args *si.ReSyncSchedulerCacheArgs) error
-}
-
-type EventPlugin interface {
- // This plugin is responsible for transmitting events to the shim side.
- // Events can be further exposed from the shim.
- SendEvent(events []*si.EventRecord)
-}
-
-// Scheduler core can update container scheduling state to the RM,
-// the shim side can determine what to do incorporate with the scheduling state
-type ContainerSchedulingStateUpdater interface {
- // update container scheduling state to the shim side
- // this might be called even the container scheduling state is unchanged
- // the shim side cannot assume to only receive updates on state changes
- // the shim side implementation must be thread safe
- Update(request *si.UpdateContainerSchedulingStateRequest)
-}
-
-type ConfigurationPlugin interface {
- UpdateConfiguration(args *si.UpdateConfigurationRequest)
*si.UpdateConfigurationResponse
-}
+}
Review comment:
nit: missing newline
##########
File path: pkg/scheduler/context.go
##########
@@ -201,31 +201,106 @@ func (cc *ClusterContext)
processRMConfigUpdateEvent(event *rmevent.RMConfigUpda
configs.ConfigContext.Set(cc.policyGroup, conf)
}
-// Main update processing: the RM passes a large multi part update which needs
to be unravelled.
-// Order of following operations is fixed, don't change unless carefully
thought through.
-// 1) applications
-// 2) allocations on existing applications
-// 3) nodes
-// Updating allocations on existing applications requires the application to
exist.
-// Node updates include recovered nodes which are linked to applications that
must exist.
-func (cc *ClusterContext) processRMUpdateEvent(event
*rmevent.RMUpdateRequestEvent) {
+func (cc *ClusterContext) handleRMUpdateNodeEvent(event
*rmevent.RMUpdateNodeEvent) {
request := event.Request
- // 1) Add / remove app requested by RM.
- cc.processApplications(request)
- // 2) Add new request, release allocation, cancel request
- cc.processAllocations(request)
- // 3) Add / remove / update Nodes
cc.processNodes(request)
}
-func (cc *ClusterContext) processNodes(request *si.UpdateRequest) {
- // 3) Add / remove / update Nodes
- // Process add node
- if len(request.NewSchedulableNodes) > 0 {
- cc.addNodes(request)
- }
- if len(request.UpdatedNodes) > 0 {
- cc.updateNodes(request)
+func (cc *ClusterContext) processNodes(request *si.NodeRequest) {
+ // 1) Add / remove / update Nodes
+ if len(request.GetNodes()) > 0 {
+ acceptedNodes := make([]*si.AcceptedNode, 0)
+ rejectedNodes := make([]*si.RejectedNode, 0)
+ for _, nodeInfo := range request.GetNodes() {
+ switch nodeInfo.Action {
+ case si.NodeInfo_CREATE:
+ sn := objects.NewNode(nodeInfo)
Review comment:
Can we move this create into its own method? Similar to the way
`updateNode` is called for all update cases? Makes the code more readable:
```
// addNode adds a node to the cluster returns error if the action fails or
nil on success
func (cc *ClusterContext) addNode(nodeInfo *si.NodeInfo) error {
}
```
If the response is a an error the error message can be used in the reject
state otherwise it is an accepted node.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]