This is an automated email from the ASF dual-hosted git repository. xjlgod pushed a commit to branch feature/saga in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push: new d3a4cb16 Feature/saga interface optimization (#778) d3a4cb16 is described below commit d3a4cb16895dc3188811a9360de3023b98b1d732 Author: Jingliu <928124...@qq.com> AuthorDate: Fri May 9 19:47:09 2025 +0800 Feature/saga interface optimization (#778) feature: saga Interface optimization --- pkg/saga/statemachine/constant/constant.go | 2 + .../core/process_ctrl_statemachine_engine.go | 278 ++++++++++++++++++++- .../engine/core/statemachine_engine.go | 27 +- pkg/saga/statemachine/engine/core/utils.go | 8 + pkg/util/errors/code.go | 2 + 5 files changed, 315 insertions(+), 2 deletions(-) diff --git a/pkg/saga/statemachine/constant/constant.go b/pkg/saga/statemachine/constant/constant.go index 6032f3a4..6446f765 100644 --- a/pkg/saga/statemachine/constant/constant.go +++ b/pkg/saga/statemachine/constant/constant.go @@ -64,6 +64,8 @@ const ( VarNameCurrentCompensationHolder string = "_current_compensation_holder_" VarNameFirstCompensationStateStarted string = "_first_compensation_state_started" VarNameCurrentLoopContextHolder string = "_current_loop_context_holder_" + VarNameRetriedStateInstId string = "_retried_state_instance_id" + VarNameIsForSubStatMachineForward string = "_is_for_sub_statemachine_forward_" // TODO: this lock in process context only has one, try to add more to add concurrent VarNameProcessContextMutexLock string = "_current_context_mutex_lock" VarNameFailEndStateFlag string = "_fail_end_state_flag_" diff --git a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go index 1b59123d..93f2dd48 100644 --- a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go +++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go @@ -41,15 +41,99 @@ func NewProcessCtrlStateMachineEngine() *ProcessCtrlStateMachineEngine { } } -func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { +func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, + startParams map[string]interface{}) (statelang.StateMachineInstance, error) { return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil) } +func (p ProcessCtrlStateMachineEngine) StartAsync(ctx context.Context, stateMachineName string, tenantId string, + startParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) { + return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, true, callback) +} + +func (p ProcessCtrlStateMachineEngine) StartWithBusinessKey(ctx context.Context, stateMachineName string, + tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { + return p.startInternal(ctx, stateMachineName, tenantId, businessKey, startParams, false, nil) +} + +func (p ProcessCtrlStateMachineEngine) StartWithBusinessKeyAsync(ctx context.Context, stateMachineName string, + tenantId string, businessKey string, startParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) { + return p.startInternal(ctx, stateMachineName, tenantId, businessKey, startParams, true, callback) +} + +func (p ProcessCtrlStateMachineEngine) Forward(ctx context.Context, stateMachineInstId string, + replaceParams map[string]interface{}) (statelang.StateMachineInstance, error) { + return p.forwardInternal(ctx, stateMachineInstId, replaceParams, false, false, nil) +} + +func (p ProcessCtrlStateMachineEngine) ForwardAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) { + return p.forwardInternal(ctx, stateMachineInstId, replaceParams, false, true, callback) +} + func (p ProcessCtrlStateMachineEngine) Compensate(ctx context.Context, stateMachineInstId string, replaceParams map[string]any) (statelang.StateMachineInstance, error) { return p.compensateInternal(ctx, stateMachineInstId, replaceParams, false, nil) } +func (p ProcessCtrlStateMachineEngine) CompensateAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) { + return p.compensateInternal(ctx, stateMachineInstId, replaceParams, true, callback) +} + +func (p ProcessCtrlStateMachineEngine) SkipAndForward(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error) { + return p.forwardInternal(ctx, stateMachineInstId, replaceParams, true, false, nil) +} + +func (p ProcessCtrlStateMachineEngine) SkipAndForwardAsync(ctx context.Context, stateMachineInstId string, callback CallBack) (statelang.StateMachineInstance, error) { + return p.forwardInternal(ctx, stateMachineInstId, nil, true, true, callback) +} + +func (p ProcessCtrlStateMachineEngine) GetStateMachineConfig() StateMachineConfig { + return p.StateMachineConfig +} + +func (p ProcessCtrlStateMachineEngine) ReloadStateMachineInstance(ctx context.Context, instId string) (statelang.StateMachineInstance, error) { + inst, err := p.StateMachineConfig.StateLogStore().GetStateMachineInstance(instId) + if err != nil { + return nil, err + } + if inst != nil { + stateMachine := inst.StateMachine() + if stateMachine == nil { + stateMachine, err = p.StateMachineConfig.StateMachineRepository().GetStateMachineById(inst.MachineID()) + if err != nil { + return nil, err + } + inst.SetStateMachine(stateMachine) + } + if stateMachine == nil { + return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "StateMachine[id:"+inst.MachineID()+"] not exist.", nil) + } + + stateList := inst.StateList() + if len(stateList) == 0 { + stateList, err = p.StateMachineConfig.StateLogStore().GetStateInstanceListByMachineInstanceId(instId) + if err != nil { + return nil, err + } + if len(stateList) > 0 { + for _, tmpStateInstance := range stateList { + inst.PutState(tmpStateInstance.ID(), tmpStateInstance) + } + } + } + + if len(inst.EndParams()) == 0 { + endParams, err := p.replayContextVariables(ctx, inst) + if err != nil { + return nil, err + } + inst.SetEndParams(endParams) + } + } + return inst, nil +} + func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) { if tenantId == "" { @@ -104,6 +188,184 @@ func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateM return stateMachineInstance, nil } +func (p ProcessCtrlStateMachineEngine) forwardInternal(ctx context.Context, stateMachineInstId string, + replaceParams map[string]interface{}, skip bool, async bool, callback CallBack) (statelang.StateMachineInstance, error) { + stateMachineInstance, err := p.reloadStateMachineInstance(ctx, stateMachineInstId) + if err != nil { + return nil, err + } + + if stateMachineInstance == nil { + return nil, exception.NewEngineExecutionException(seataErrors.StateMachineInstanceNotExists, "StateMachineInstance is not exists", nil) + } + + if stateMachineInstance.Status() == statelang.SU && stateMachineInstance.CompensationStatus() == "" { + return stateMachineInstance, nil + } + + acceptStatus := []statelang.ExecutionStatus{statelang.FA, statelang.UN, statelang.RU} + if _, err := p.checkStatus(ctx, stateMachineInstance, acceptStatus, nil, stateMachineInstance.Status(), "", "forward"); err != nil { + return nil, err + } + + actList := stateMachineInstance.StateList() + if len(actList) == 0 { + return nil, exception.NewEngineExecutionException(seataErrors.OperationDenied, + fmt.Sprintf("StateMachineInstance[id:%s] has no stateInstance, please start a new StateMachine execution instead", stateMachineInstId), nil) + } + + lastForwardState, err := p.findOutLastForwardStateInstance(actList) + if err != nil { + return nil, err + } + if lastForwardState == nil { + return nil, exception.NewEngineExecutionException(seataErrors.OperationDenied, + fmt.Sprintf("StateMachineInstance[id:%s] Cannot find last forward execution stateInstance", stateMachineInstId), nil) + } + + contextBuilder := NewProcessContextBuilder(). + WithProcessType(process.StateLang). + WithOperationName(constant.OperationNameForward). + WithAsyncCallback(callback). + WithStateMachineInstance(stateMachineInstance). + WithStateInstance(lastForwardState). + WithStateMachineConfig(p.StateMachineConfig). + WithStateMachineEngine(p). + WithIsAsyncExecution(async) + + context := contextBuilder.Build() + + contextVariables, err := p.getStateMachineContextVariables(ctx, stateMachineInstance) + if err != nil { + return nil, err + } + + if replaceParams != nil { + for k, v := range replaceParams { + contextVariables[k] = v + } + } + p.putBusinesskeyToContextariables(stateMachineInstance, contextVariables) + + concurrentContextVariables := p.copyMap(contextVariables) + + context.SetVariable(constant.VarNameStateMachineContext, concurrentContextVariables) + stateMachineInstance.SetContext(concurrentContextVariables) + + originStateName := GetOriginStateName(lastForwardState) + lastState := stateMachineInstance.StateMachine().State(originStateName) + loop := GetLoopConfig(ctx, context, lastState) + if loop != nil && lastForwardState.Status() == statelang.SU { + lastForwardState = p.findOutLastNeedForwardStateInstance(ctx, context) + } + + context.SetVariable(lastForwardState.Name()+constant.VarNameRetriedStateInstId, lastForwardState.ID()) + if lastForwardState.Type() == constant.StateTypeSubStateMachine && lastForwardState.CompensationStatus() != statelang.SU { + context.SetVariable(constant.VarNameIsForSubStatMachineForward, true) + } + + if lastForwardState.Status() != statelang.SU { + lastForwardState.SetIgnoreStatus(true) + } + + inst := NewStateInstruction(stateMachineInstance.StateMachine().Name(), stateMachineInstance.TenantID()) + if skip || lastForwardState.Status() == statelang.SU { + next := "" + curState := stateMachineInstance.StateMachine().State(GetOriginStateName(lastForwardState)) + if taskState, ok := curState.(*state.AbstractTaskState); ok { + next = taskState.Next() + } + if next == "" { + log.Warn(fmt.Sprintf("Last Forward execution StateInstance was succeed, and it has not Next State, skip forward operation")) + return stateMachineInstance, nil + } + inst.SetStateName(next) + } else { + if lastForwardState.Status() == statelang.RU && !IsTimeout(lastForwardState.StartedTime(), p.StateMachineConfig.ServiceInvokeTimeout()) { + return nil, exception.NewEngineExecutionException(seataErrors.OperationDenied, + fmt.Sprintf("State [%s] is running, operation[forward] denied", lastForwardState.Name()), nil) + } + inst.SetStateName(GetOriginStateName(lastForwardState)) + } + context.SetInstruction(inst) + + stateMachineInstance.SetStatus(statelang.RU) + stateMachineInstance.SetRunning(true) + + log.Info(fmt.Sprintf("Operation [forward] started stateMachineInstance[id:%s]", stateMachineInstance.ID())) + + if stateMachineInstance.StateMachine().IsPersist() { + if err := p.StateMachineConfig.StateLogStore().RecordStateMachineRestarted(ctx, stateMachineInstance, context); err != nil { + return nil, err + } + } + + curState, err := inst.GetState(context) + if err != nil { + return nil, err + } + loop = GetLoopConfig(ctx, context, curState) + if loop != nil { + inst.SetTemporaryState(state.NewLoopStartStateImpl()) + } + + if async { + if _, err := p.StateMachineConfig.AsyncEventPublisher().PushEvent(ctx, context); err != nil { + return nil, err + } + } else { + if _, err := p.StateMachineConfig.EventPublisher().PushEvent(ctx, context); err != nil { + return nil, err + } + } + + return stateMachineInstance, nil +} + +func (p ProcessCtrlStateMachineEngine) findOutLastForwardStateInstance(stateInstanceList []statelang.StateInstance) (statelang.StateInstance, error) { + var lastForwardStateInstance statelang.StateInstance + var err error + for i := len(stateInstanceList) - 1; i >= 0; i-- { + stateInstance := stateInstanceList[i] + if !stateInstance.IsForCompensation() { + if stateInstance.CompensationStatus() == statelang.SU { + continue + } + + if stateInstance.Type() == constant.StateTypeSubStateMachine { + finalState := stateInstance + for finalState.StateIDRetriedFor() != "" { + if finalState, err = p.StateMachineConfig.StateLogStore().GetStateInstance(finalState.StateIDRetriedFor(), + finalState.MachineInstanceID()); err != nil { + return nil, err + } + } + + subInst, _ := p.StateMachineConfig.StateLogStore().GetStateMachineInstanceByParentId(GenerateParentId(finalState)) + if len(subInst) > 0 { + if subInst[0].CompensationStatus() == statelang.SU { + continue + } + + if subInst[0].CompensationStatus() == statelang.UN { + return nil, exception.NewEngineExecutionException(seataErrors.ForwardInvalid, + "Last forward execution state instance is SubStateMachine and compensation status is [UN], Operation[forward] denied, stateInstanceId:"+stateInstance.ID(), + nil) + } + } + } else if stateInstance.CompensationStatus() == statelang.UN { + return nil, exception.NewEngineExecutionException(seataErrors.ForwardInvalid, + "Last forward execution state instance compensation status is [UN], Operation[forward] denied, stateInstanceId:"+stateInstance.ID(), + nil) + } + + lastForwardStateInstance = stateInstance + break + } + } + return lastForwardStateInstance, nil +} + // copyMap not deep copy, so best practice: Don’t pass by reference func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} { copyMap := make(map[string]interface{}, len(startParams)) @@ -439,3 +701,17 @@ func (p ProcessCtrlStateMachineEngine) nullSafeCopy(srcMap map[string]any, destM } } } + +func (p ProcessCtrlStateMachineEngine) findOutLastNeedForwardStateInstance(ctx context.Context, processContext ProcessContext) statelang.StateInstance { + stateMachineInstance := processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance) + lastForwardState := processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance) + + actList := stateMachineInstance.StateList() + for i := len(actList) - 1; i >= 0; i-- { + stateInstance := actList[i] + if GetOriginStateName(stateInstance) == GetOriginStateName(lastForwardState) && stateInstance.Status() != statelang.SU { + return stateInstance + } + } + return lastForwardState +} diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine.go b/pkg/saga/statemachine/engine/core/statemachine_engine.go index 75cdb1a6..6ad36204 100644 --- a/pkg/saga/statemachine/engine/core/statemachine_engine.go +++ b/pkg/saga/statemachine/engine/core/statemachine_engine.go @@ -23,8 +23,33 @@ import ( ) type StateMachineEngine interface { + // Start starts a state machine instance Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) - Compensate(ctx context.Context, stateMachineInstId string, replaceParams map[string]any) (statelang.StateMachineInstance, error) + // StartAsync start a state machine instance asynchronously + StartAsync(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}, + callback CallBack) (statelang.StateMachineInstance, error) + // StartWithBusinessKey starts a state machine instance with a business key + StartWithBusinessKey(ctx context.Context, stateMachineName string, tenantId string, businessKey string, + startParams map[string]interface{}) (statelang.StateMachineInstance, error) + // StartWithBusinessKeyAsync starts a state machine instance with a business key asynchronously + StartWithBusinessKeyAsync(ctx context.Context, stateMachineName string, tenantId string, businessKey string, + startParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) + // Forward restart a failed state machine instance + Forward(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error) + // ForwardAsync restart a failed state machine instance asynchronously + ForwardAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) + // Compensate compensate a state machine instance + Compensate(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error) + // CompensateAsync compensate a state machine instance asynchronously + CompensateAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) + // SkipAndForward skips the current failed state instance and restarts the state machine instance + SkipAndForward(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error) + // SkipAndForwardAsync skips the current failed state instance and restarts the state machine instance asynchronously + SkipAndForwardAsync(ctx context.Context, stateMachineInstId string, callback CallBack) (statelang.StateMachineInstance, error) + // GetStateMachineConfig gets the state machine configurations + GetStateMachineConfig() StateMachineConfig + // ReloadStateMachineInstance reloads a state machine instance + ReloadStateMachineInstance(ctx context.Context, instId string) (statelang.StateMachineInstance, error) } type CallBack interface { diff --git a/pkg/saga/statemachine/engine/core/utils.go b/pkg/saga/statemachine/engine/core/utils.go index 49f03e00..afa1fd66 100644 --- a/pkg/saga/statemachine/engine/core/utils.go +++ b/pkg/saga/statemachine/engine/core/utils.go @@ -98,6 +98,14 @@ func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContext return p } +func (p *ProcessContextBuilder) WithStateInstance(state statelang.StateInstance) *ProcessContextBuilder { + if state != nil { + p.processContext.SetVariable(constant.VarNameStateInst, state) + } + + return p +} + func (p *ProcessContextBuilder) Build() ProcessContext { return p.processContext } diff --git a/pkg/util/errors/code.go b/pkg/util/errors/code.go index 9b15e72a..5d265df7 100644 --- a/pkg/util/errors/code.go +++ b/pkg/util/errors/code.go @@ -111,4 +111,6 @@ const ( InvalidParameter // OperationDenied Operation denied OperationDenied + // ForwardInvalid Forward invalid + ForwardInvalid ) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org