chia7712 commented on code in PR #900:
URL: https://github.com/apache/yunikorn-core/pull/900#discussion_r1666430176
##########
pkg/scheduler/partition.go:
##########
@@ -1239,56 +1239,55 @@ func (pc *PartitionContext)
calculateNodesResourceUsage() map[string][]int {
return mapResult
}
-// removeAllocation removes the referenced allocation(s) from the applications
and nodes
-// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) {
- if release == nil {
- return nil, nil
- }
- appID := release.ApplicationID
- allocationKey := release.GetAllocationKey()
- app := pc.getApplication(appID)
- // no app nothing to do everything should already be clean
- if app == nil {
- log.Log(log.SchedPartition).Info("Application not found while
releasing allocation",
- zap.String("appID", appID),
- zap.String("allocationKey", allocationKey),
- zap.Stringer("terminationType",
release.TerminationType))
- return nil, nil
- }
- // Processing a removal while in the Completing state could race with
the state change.
- // The race occurs between removing the allocation and updating the
queue after node processing.
- // If the state change removes the queue link before we get to updating
the queue after the node we
- // leave the resources as allocated on the queue. The queue cannot be
removed yet at this point as
- // there are still allocations left. So retrieve the queue early to
sidestep the race.
- queue := app.GetQueue()
- // temp store for allocations manipulated
+func (pc *PartitionContext) generateReleased(release *si.AllocationRelease,
app *objects.Application) []*objects.Allocation {
released := make([]*objects.Allocation, 0)
- var confirmed *objects.Allocation
- // when allocationKey is not specified, remove all allocations from the
app
+ allocationKey := release.GetAllocationKey()
if allocationKey == "" {
log.Log(log.SchedPartition).Info("remove all allocations",
- zap.String("appID", appID))
+ zap.String("appID", app.ApplicationID))
released = append(released, app.RemoveAllAllocations()...)
} else {
- // if we have an allocationKey the termination type is important
Review Comment:
please keep this comment
##########
pkg/scheduler/partition.go:
##########
@@ -1239,56 +1239,55 @@ func (pc *PartitionContext)
calculateNodesResourceUsage() map[string][]int {
return mapResult
}
-// removeAllocation removes the referenced allocation(s) from the applications
and nodes
-// NOTE: this is a lock free call. It must NOT be called holding the
PartitionContext lock.
-func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease)
([]*objects.Allocation, *objects.Allocation) {
- if release == nil {
- return nil, nil
- }
- appID := release.ApplicationID
- allocationKey := release.GetAllocationKey()
- app := pc.getApplication(appID)
- // no app nothing to do everything should already be clean
- if app == nil {
- log.Log(log.SchedPartition).Info("Application not found while
releasing allocation",
- zap.String("appID", appID),
- zap.String("allocationKey", allocationKey),
- zap.Stringer("terminationType",
release.TerminationType))
- return nil, nil
- }
- // Processing a removal while in the Completing state could race with
the state change.
Review Comment:
please keep it
##########
pkg/scheduler/objects/application_state.go:
##########
@@ -90,141 +90,149 @@ func (as applicationState) String() string {
return [...]string{"New", "Accepted", "Running", "Rejected",
"Completing", "Completed", "Failing", "Failed", "Expired", "Resuming"}[as]
}
-func NewAppState() *fsm.FSM {
- return fsm.NewFSM(
- New.String(), fsm.Events{
- {
- Name: RejectApplication.String(),
- Src: []string{New.String()},
- Dst: Rejected.String(),
- }, {
- Name: RunApplication.String(),
- Src: []string{New.String(), Resuming.String()},
- Dst: Accepted.String(),
- }, {
- Name: RunApplication.String(),
- Src: []string{Accepted.String(),
Running.String(), Completing.String()},
- Dst: Running.String(),
- }, {
- Name: CompleteApplication.String(),
- Src: []string{Accepted.String(),
Running.String()},
- Dst: Completing.String(),
- }, {
- Name: CompleteApplication.String(),
- Src: []string{Completing.String()},
- Dst: Completed.String(),
- }, {
- Name: FailApplication.String(),
- Src: []string{New.String(), Accepted.String(),
Running.String()},
- Dst: Failing.String(),
- }, {
- Name: FailApplication.String(),
- Src: []string{Failing.String()},
- Dst: Failed.String(),
- }, {
- Name: ResumeApplication.String(),
- Src: []string{New.String(), Accepted.String()},
- Dst: Resuming.String(),
- }, {
- Name: ExpireApplication.String(),
- Src: []string{Completed.String(),
Failed.String(), Rejected.String()},
- Dst: Expired.String(),
- },
- },
- fsm.Callbacks{
- // The state machine is tightly tied to the Application
object.
- //
- // The first argument must always be an Application and
if there is a second,
- // that must be a string. If this precondition is not
met, a runtime panic
- // will occur.
- "enter_state": func(_ context.Context, event
*fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
- log.Log(log.SchedFSM).Info("Application state
transition",
- zap.String("appID", app.ApplicationID),
- zap.String("source", event.Src),
- zap.String("destination", event.Dst),
- zap.String("event", event.Event))
-
- eventInfo := ""
- if len(event.Args) == 2 {
- eventInfo = event.Args[1].(string)
//nolint:errcheck
- app.OnStateChange(event, eventInfo)
- } else {
- app.OnStateChange(event, "")
- }
- eventDetails, ok := stateEvents[event.Dst]
- if !ok {
- log.Log(log.SchedFSM).Error("event
details not found",
- zap.String("state", event.Dst))
- return
- }
- if app.sendStateChangeEvents {
-
app.appEvents.SendStateChangeEvent(app.ApplicationID, eventDetails, eventInfo)
- }
- },
- "leave_state": func(_ context.Context, event
*fsm.Event) {
- event.Args[0].(*Application).clearStateTimer()
//nolint:errcheck
- },
- fmt.Sprintf("enter_%s", Completing.String()): func(_
context.Context, event *fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
- app.setStateTimer(completingTimeout,
app.stateMachine.Current(), CompleteApplication)
- },
- fmt.Sprintf("leave_%s", New.String()): func(_
context.Context, event *fsm.Event) {
- if event.Dst != Rejected.String() {
- app := event.Args[0].(*Application)
//nolint:errcheck
-
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsAccepted()
-
metrics.GetSchedulerMetrics().IncTotalApplicationsAccepted()
- }
- },
- fmt.Sprintf("enter_%s", Rejected.String()): func(_
context.Context, event *fsm.Event) {
- app := event.Args[0].(*Application)
//nolint:errcheck
-
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRejected()
-
metrics.GetSchedulerMetrics().IncTotalApplicationsRejected()
- app.setStateTimer(terminatedTimeout,
app.stateMachine.Current(), ExpireApplication)
- app.finishedTime = time.Now()
- app.cleanupTrackedResource()
- // No rejected message when use
app.HandleApplicationEvent(RejectApplication)
- if len(event.Args) == 2 {
- app.rejectedMessage =
event.Args[1].(string) //nolint:errcheck
- }
- },
- fmt.Sprintf("enter_%s", Running.String()): func(_
context.Context, event *fsm.Event) {
- if event.Src != Running.String() {
- app := event.Args[0].(*Application)
//nolint:errcheck
- app.startTime = time.Now()
-
app.queue.incRunningApps(app.ApplicationID)
-
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsRunning()
-
metrics.GetSchedulerMetrics().IncTotalApplicationsRunning()
- }
- },
- fmt.Sprintf("leave_%s", Running.String()): func(_
context.Context, event *fsm.Event) {
- if event.Dst != Running.String() {
- app := event.Args[0].(*Application)
//nolint:errcheck
- app.queue.decRunningApps()
-
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
-
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
- }
- },
- fmt.Sprintf("enter_%s", Completed.String()): func(_
context.Context, event *fsm.Event) {
+func eventDesc() fsm.Events {
+ return fsm.Events{
+ {
Review Comment:
please keep the format to reduce the changes.
```go
{
Name: RejectApplication.String(),
Src: []string{New.String()},
Dst: Rejected.String(),
}, {
Name: RunApplication.String(),
Src: []string{New.String(), Resuming.String()},
Dst: Accepted.String(),
}, {
Name: RunApplication.String(),
Src: []string{Accepted.String(),
Running.String(), Completing.String()},
Dst: Running.String(),
}, {
Name: CompleteApplication.String(),
Src: []string{Accepted.String(),
Running.String()},
Dst: Completing.String(),
}, {
Name: CompleteApplication.String(),
Src: []string{Completing.String()},
Dst: Completed.String(),
}, {
Name: FailApplication.String(),
Src: []string{New.String(), Accepted.String(),
Running.String()},
Dst: Failing.String(),
}, {
Name: FailApplication.String(),
Src: []string{Failing.String()},
Dst: Failed.String(),
}, {
Name: ResumeApplication.String(),
Src: []string{New.String(), Accepted.String()},
Dst: Resuming.String(),
}, {
Name: ExpireApplication.String(),
Src: []string{Completed.String(),
Failed.String(), Rejected.String()},
Dst: Expired.String(),
},
```
##########
pkg/scheduler/objects/application_state.go:
##########
@@ -90,141 +90,149 @@ func (as applicationState) String() string {
return [...]string{"New", "Accepted", "Running", "Rejected",
"Completing", "Completed", "Failing", "Failed", "Expired", "Resuming"}[as]
}
-func NewAppState() *fsm.FSM {
- return fsm.NewFSM(
- New.String(), fsm.Events{
- {
- Name: RejectApplication.String(),
- Src: []string{New.String()},
- Dst: Rejected.String(),
- }, {
- Name: RunApplication.String(),
- Src: []string{New.String(), Resuming.String()},
- Dst: Accepted.String(),
- }, {
- Name: RunApplication.String(),
- Src: []string{Accepted.String(),
Running.String(), Completing.String()},
- Dst: Running.String(),
- }, {
- Name: CompleteApplication.String(),
- Src: []string{Accepted.String(),
Running.String()},
- Dst: Completing.String(),
- }, {
- Name: CompleteApplication.String(),
- Src: []string{Completing.String()},
- Dst: Completed.String(),
- }, {
- Name: FailApplication.String(),
- Src: []string{New.String(), Accepted.String(),
Running.String()},
- Dst: Failing.String(),
- }, {
- Name: FailApplication.String(),
- Src: []string{Failing.String()},
- Dst: Failed.String(),
- }, {
- Name: ResumeApplication.String(),
- Src: []string{New.String(), Accepted.String()},
- Dst: Resuming.String(),
- }, {
- Name: ExpireApplication.String(),
- Src: []string{Completed.String(),
Failed.String(), Rejected.String()},
- Dst: Expired.String(),
- },
- },
- fsm.Callbacks{
- // The state machine is tightly tied to the Application
object.
Review Comment:
please keep it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]