This is an automated email from the ASF dual-hosted git repository.

xjlgod pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/saga by this push:
     new d3a4cb16 Feature/saga interface optimization (#778)
d3a4cb16 is described below

commit d3a4cb16895dc3188811a9360de3023b98b1d732
Author: Jingliu <928124...@qq.com>
AuthorDate: Fri May 9 19:47:09 2025 +0800

    Feature/saga interface optimization (#778)
    
    feature: saga Interface optimization
---
 pkg/saga/statemachine/constant/constant.go         |   2 +
 .../core/process_ctrl_statemachine_engine.go       | 278 ++++++++++++++++++++-
 .../engine/core/statemachine_engine.go             |  27 +-
 pkg/saga/statemachine/engine/core/utils.go         |   8 +
 pkg/util/errors/code.go                            |   2 +
 5 files changed, 315 insertions(+), 2 deletions(-)

diff --git a/pkg/saga/statemachine/constant/constant.go 
b/pkg/saga/statemachine/constant/constant.go
index 6032f3a4..6446f765 100644
--- a/pkg/saga/statemachine/constant/constant.go
+++ b/pkg/saga/statemachine/constant/constant.go
@@ -64,6 +64,8 @@ const (
        VarNameCurrentCompensationHolder     string = 
"_current_compensation_holder_"
        VarNameFirstCompensationStateStarted string = 
"_first_compensation_state_started"
        VarNameCurrentLoopContextHolder      string = 
"_current_loop_context_holder_"
+       VarNameRetriedStateInstId            string = 
"_retried_state_instance_id"
+       VarNameIsForSubStatMachineForward    string = 
"_is_for_sub_statemachine_forward_"
        // TODO: this lock in process context only has one, try to add more to 
add concurrent
        VarNameProcessContextMutexLock string = "_current_context_mutex_lock"
        VarNameFailEndStateFlag        string = "_fail_end_state_flag_"
diff --git 
a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go 
b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
index 1b59123d..93f2dd48 100644
--- a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
+++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
@@ -41,15 +41,99 @@ func NewProcessCtrlStateMachineEngine() 
*ProcessCtrlStateMachineEngine {
        }
 }
 
-func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, 
stateMachineName string, tenantId string, startParams map[string]interface{}) 
(statelang.StateMachineInstance, error) {
+func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, 
stateMachineName string, tenantId string,
+       startParams map[string]interface{}) (statelang.StateMachineInstance, 
error) {
        return p.startInternal(ctx, stateMachineName, tenantId, "", 
startParams, false, nil)
 }
 
+func (p ProcessCtrlStateMachineEngine) StartAsync(ctx context.Context, 
stateMachineName string, tenantId string,
+       startParams map[string]interface{}, callback CallBack) 
(statelang.StateMachineInstance, error) {
+       return p.startInternal(ctx, stateMachineName, tenantId, "", 
startParams, true, callback)
+}
+
+func (p ProcessCtrlStateMachineEngine) StartWithBusinessKey(ctx 
context.Context, stateMachineName string,
+       tenantId string, businessKey string, startParams 
map[string]interface{}) (statelang.StateMachineInstance, error) {
+       return p.startInternal(ctx, stateMachineName, tenantId, businessKey, 
startParams, false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) StartWithBusinessKeyAsync(ctx 
context.Context, stateMachineName string,
+       tenantId string, businessKey string, startParams 
map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, 
error) {
+       return p.startInternal(ctx, stateMachineName, tenantId, businessKey, 
startParams, true, callback)
+}
+
+func (p ProcessCtrlStateMachineEngine) Forward(ctx context.Context, 
stateMachineInstId string,
+       replaceParams map[string]interface{}) (statelang.StateMachineInstance, 
error) {
+       return p.forwardInternal(ctx, stateMachineInstId, replaceParams, false, 
false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) ForwardAsync(ctx context.Context, 
stateMachineInstId string, replaceParams map[string]interface{}, callback 
CallBack) (statelang.StateMachineInstance, error) {
+       return p.forwardInternal(ctx, stateMachineInstId, replaceParams, false, 
true, callback)
+}
+
 func (p ProcessCtrlStateMachineEngine) Compensate(ctx context.Context, 
stateMachineInstId string,
        replaceParams map[string]any) (statelang.StateMachineInstance, error) {
        return p.compensateInternal(ctx, stateMachineInstId, replaceParams, 
false, nil)
 }
 
+func (p ProcessCtrlStateMachineEngine) CompensateAsync(ctx context.Context, 
stateMachineInstId string, replaceParams map[string]interface{}, callback 
CallBack) (statelang.StateMachineInstance, error) {
+       return p.compensateInternal(ctx, stateMachineInstId, replaceParams, 
true, callback)
+}
+
+func (p ProcessCtrlStateMachineEngine) SkipAndForward(ctx context.Context, 
stateMachineInstId string, replaceParams map[string]interface{}) 
(statelang.StateMachineInstance, error) {
+       return p.forwardInternal(ctx, stateMachineInstId, replaceParams, true, 
false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) SkipAndForwardAsync(ctx 
context.Context, stateMachineInstId string, callback CallBack) 
(statelang.StateMachineInstance, error) {
+       return p.forwardInternal(ctx, stateMachineInstId, nil, true, true, 
callback)
+}
+
+func (p ProcessCtrlStateMachineEngine) GetStateMachineConfig() 
StateMachineConfig {
+       return p.StateMachineConfig
+}
+
+func (p ProcessCtrlStateMachineEngine) ReloadStateMachineInstance(ctx 
context.Context, instId string) (statelang.StateMachineInstance, error) {
+       inst, err := 
p.StateMachineConfig.StateLogStore().GetStateMachineInstance(instId)
+       if err != nil {
+               return nil, err
+       }
+       if inst != nil {
+               stateMachine := inst.StateMachine()
+               if stateMachine == nil {
+                       stateMachine, err = 
p.StateMachineConfig.StateMachineRepository().GetStateMachineById(inst.MachineID())
+                       if err != nil {
+                               return nil, err
+                       }
+                       inst.SetStateMachine(stateMachine)
+               }
+               if stateMachine == nil {
+                       return nil, 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                               "StateMachine[id:"+inst.MachineID()+"] not 
exist.", nil)
+               }
+
+               stateList := inst.StateList()
+               if len(stateList) == 0 {
+                       stateList, err = 
p.StateMachineConfig.StateLogStore().GetStateInstanceListByMachineInstanceId(instId)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if len(stateList) > 0 {
+                               for _, tmpStateInstance := range stateList {
+                                       inst.PutState(tmpStateInstance.ID(), 
tmpStateInstance)
+                               }
+                       }
+               }
+
+               if len(inst.EndParams()) == 0 {
+                       endParams, err := p.replayContextVariables(ctx, inst)
+                       if err != nil {
+                               return nil, err
+                       }
+                       inst.SetEndParams(endParams)
+               }
+       }
+       return inst, nil
+}
+
 func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, 
stateMachineName string, tenantId string,
        businessKey string, startParams map[string]interface{}, async bool, 
callback CallBack) (statelang.StateMachineInstance, error) {
        if tenantId == "" {
@@ -104,6 +188,184 @@ func (p ProcessCtrlStateMachineEngine) startInternal(ctx 
context.Context, stateM
        return stateMachineInstance, nil
 }
 
+func (p ProcessCtrlStateMachineEngine) forwardInternal(ctx context.Context, 
stateMachineInstId string,
+       replaceParams map[string]interface{}, skip bool, async bool, callback 
CallBack) (statelang.StateMachineInstance, error) {
+       stateMachineInstance, err := p.reloadStateMachineInstance(ctx, 
stateMachineInstId)
+       if err != nil {
+               return nil, err
+       }
+
+       if stateMachineInstance == nil {
+               return nil, 
exception.NewEngineExecutionException(seataErrors.StateMachineInstanceNotExists,
 "StateMachineInstance is not exists", nil)
+       }
+
+       if stateMachineInstance.Status() == statelang.SU && 
stateMachineInstance.CompensationStatus() == "" {
+               return stateMachineInstance, nil
+       }
+
+       acceptStatus := []statelang.ExecutionStatus{statelang.FA, statelang.UN, 
statelang.RU}
+       if _, err := p.checkStatus(ctx, stateMachineInstance, acceptStatus, 
nil, stateMachineInstance.Status(), "", "forward"); err != nil {
+               return nil, err
+       }
+
+       actList := stateMachineInstance.StateList()
+       if len(actList) == 0 {
+               return nil, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+                       fmt.Sprintf("StateMachineInstance[id:%s] has no 
stateInstance, please start a new StateMachine execution instead", 
stateMachineInstId), nil)
+       }
+
+       lastForwardState, err := p.findOutLastForwardStateInstance(actList)
+       if err != nil {
+               return nil, err
+       }
+       if lastForwardState == nil {
+               return nil, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+                       fmt.Sprintf("StateMachineInstance[id:%s] Cannot find 
last forward execution stateInstance", stateMachineInstId), nil)
+       }
+
+       contextBuilder := NewProcessContextBuilder().
+               WithProcessType(process.StateLang).
+               WithOperationName(constant.OperationNameForward).
+               WithAsyncCallback(callback).
+               WithStateMachineInstance(stateMachineInstance).
+               WithStateInstance(lastForwardState).
+               WithStateMachineConfig(p.StateMachineConfig).
+               WithStateMachineEngine(p).
+               WithIsAsyncExecution(async)
+
+       context := contextBuilder.Build()
+
+       contextVariables, err := p.getStateMachineContextVariables(ctx, 
stateMachineInstance)
+       if err != nil {
+               return nil, err
+       }
+
+       if replaceParams != nil {
+               for k, v := range replaceParams {
+                       contextVariables[k] = v
+               }
+       }
+       p.putBusinesskeyToContextariables(stateMachineInstance, 
contextVariables)
+
+       concurrentContextVariables := p.copyMap(contextVariables)
+
+       context.SetVariable(constant.VarNameStateMachineContext, 
concurrentContextVariables)
+       stateMachineInstance.SetContext(concurrentContextVariables)
+
+       originStateName := GetOriginStateName(lastForwardState)
+       lastState := stateMachineInstance.StateMachine().State(originStateName)
+       loop := GetLoopConfig(ctx, context, lastState)
+       if loop != nil && lastForwardState.Status() == statelang.SU {
+               lastForwardState = p.findOutLastNeedForwardStateInstance(ctx, 
context)
+       }
+
+       
context.SetVariable(lastForwardState.Name()+constant.VarNameRetriedStateInstId, 
lastForwardState.ID())
+       if lastForwardState.Type() == constant.StateTypeSubStateMachine && 
lastForwardState.CompensationStatus() != statelang.SU {
+               context.SetVariable(constant.VarNameIsForSubStatMachineForward, 
true)
+       }
+
+       if lastForwardState.Status() != statelang.SU {
+               lastForwardState.SetIgnoreStatus(true)
+       }
+
+       inst := NewStateInstruction(stateMachineInstance.StateMachine().Name(), 
stateMachineInstance.TenantID())
+       if skip || lastForwardState.Status() == statelang.SU {
+               next := ""
+               curState := 
stateMachineInstance.StateMachine().State(GetOriginStateName(lastForwardState))
+               if taskState, ok := curState.(*state.AbstractTaskState); ok {
+                       next = taskState.Next()
+               }
+               if next == "" {
+                       log.Warn(fmt.Sprintf("Last Forward execution 
StateInstance was succeed, and it has not Next State, skip forward operation"))
+                       return stateMachineInstance, nil
+               }
+               inst.SetStateName(next)
+       } else {
+               if lastForwardState.Status() == statelang.RU && 
!IsTimeout(lastForwardState.StartedTime(), 
p.StateMachineConfig.ServiceInvokeTimeout()) {
+                       return nil, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+                               fmt.Sprintf("State [%s] is running, 
operation[forward] denied", lastForwardState.Name()), nil)
+               }
+               inst.SetStateName(GetOriginStateName(lastForwardState))
+       }
+       context.SetInstruction(inst)
+
+       stateMachineInstance.SetStatus(statelang.RU)
+       stateMachineInstance.SetRunning(true)
+
+       log.Info(fmt.Sprintf("Operation [forward] started  
stateMachineInstance[id:%s]", stateMachineInstance.ID()))
+
+       if stateMachineInstance.StateMachine().IsPersist() {
+               if err := 
p.StateMachineConfig.StateLogStore().RecordStateMachineRestarted(ctx, 
stateMachineInstance, context); err != nil {
+                       return nil, err
+               }
+       }
+
+       curState, err := inst.GetState(context)
+       if err != nil {
+               return nil, err
+       }
+       loop = GetLoopConfig(ctx, context, curState)
+       if loop != nil {
+               inst.SetTemporaryState(state.NewLoopStartStateImpl())
+       }
+
+       if async {
+               if _, err := 
p.StateMachineConfig.AsyncEventPublisher().PushEvent(ctx, context); err != nil {
+                       return nil, err
+               }
+       } else {
+               if _, err := 
p.StateMachineConfig.EventPublisher().PushEvent(ctx, context); err != nil {
+                       return nil, err
+               }
+       }
+
+       return stateMachineInstance, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) 
findOutLastForwardStateInstance(stateInstanceList []statelang.StateInstance) 
(statelang.StateInstance, error) {
+       var lastForwardStateInstance statelang.StateInstance
+       var err error
+       for i := len(stateInstanceList) - 1; i >= 0; i-- {
+               stateInstance := stateInstanceList[i]
+               if !stateInstance.IsForCompensation() {
+                       if stateInstance.CompensationStatus() == statelang.SU {
+                               continue
+                       }
+
+                       if stateInstance.Type() == 
constant.StateTypeSubStateMachine {
+                               finalState := stateInstance
+                               for finalState.StateIDRetriedFor() != "" {
+                                       if finalState, err = 
p.StateMachineConfig.StateLogStore().GetStateInstance(finalState.StateIDRetriedFor(),
+                                               
finalState.MachineInstanceID()); err != nil {
+                                               return nil, err
+                                       }
+                               }
+
+                               subInst, _ := 
p.StateMachineConfig.StateLogStore().GetStateMachineInstanceByParentId(GenerateParentId(finalState))
+                               if len(subInst) > 0 {
+                                       if subInst[0].CompensationStatus() == 
statelang.SU {
+                                               continue
+                                       }
+
+                                       if subInst[0].CompensationStatus() == 
statelang.UN {
+                                               return nil, 
exception.NewEngineExecutionException(seataErrors.ForwardInvalid,
+                                                       "Last forward execution 
state instance is SubStateMachine and compensation status is [UN], 
Operation[forward] denied, stateInstanceId:"+stateInstance.ID(),
+                                                       nil)
+                                       }
+                               }
+                       } else if stateInstance.CompensationStatus() == 
statelang.UN {
+                               return nil, 
exception.NewEngineExecutionException(seataErrors.ForwardInvalid,
+                                       "Last forward execution state instance 
compensation status is [UN], Operation[forward] denied, 
stateInstanceId:"+stateInstance.ID(),
+                                       nil)
+                       }
+
+                       lastForwardStateInstance = stateInstance
+                       break
+               }
+       }
+       return lastForwardStateInstance, nil
+}
+
 // copyMap not deep copy, so best practice: Don’t pass by reference
 func (p ProcessCtrlStateMachineEngine) copyMap(startParams 
map[string]interface{}) map[string]interface{} {
        copyMap := make(map[string]interface{}, len(startParams))
@@ -439,3 +701,17 @@ func (p ProcessCtrlStateMachineEngine) nullSafeCopy(srcMap 
map[string]any, destM
                }
        }
 }
+
+func (p ProcessCtrlStateMachineEngine) findOutLastNeedForwardStateInstance(ctx 
context.Context, processContext ProcessContext) statelang.StateInstance {
+       stateMachineInstance := 
processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
+       lastForwardState := 
processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)
+
+       actList := stateMachineInstance.StateList()
+       for i := len(actList) - 1; i >= 0; i-- {
+               stateInstance := actList[i]
+               if GetOriginStateName(stateInstance) == 
GetOriginStateName(lastForwardState) && stateInstance.Status() != statelang.SU {
+                       return stateInstance
+               }
+       }
+       return lastForwardState
+}
diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine.go 
b/pkg/saga/statemachine/engine/core/statemachine_engine.go
index 75cdb1a6..6ad36204 100644
--- a/pkg/saga/statemachine/engine/core/statemachine_engine.go
+++ b/pkg/saga/statemachine/engine/core/statemachine_engine.go
@@ -23,8 +23,33 @@ import (
 )
 
 type StateMachineEngine interface {
+       // Start starts a state machine instance
        Start(ctx context.Context, stateMachineName string, tenantId string, 
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
-       Compensate(ctx context.Context, stateMachineInstId string, 
replaceParams map[string]any) (statelang.StateMachineInstance, error)
+       // StartAsync start a state machine instance asynchronously
+       StartAsync(ctx context.Context, stateMachineName string, tenantId 
string, startParams map[string]interface{},
+               callback CallBack) (statelang.StateMachineInstance, error)
+       // StartWithBusinessKey starts a state machine instance with a business 
key
+       StartWithBusinessKey(ctx context.Context, stateMachineName string, 
tenantId string, businessKey string,
+               startParams map[string]interface{}) 
(statelang.StateMachineInstance, error)
+       // StartWithBusinessKeyAsync starts a state machine instance with a 
business key asynchronously
+       StartWithBusinessKeyAsync(ctx context.Context, stateMachineName string, 
tenantId string, businessKey string,
+               startParams map[string]interface{}, callback CallBack) 
(statelang.StateMachineInstance, error)
+       // Forward  restart a failed state machine instance
+       Forward(ctx context.Context, stateMachineInstId string, replaceParams 
map[string]interface{}) (statelang.StateMachineInstance, error)
+       // ForwardAsync restart a failed state machine instance asynchronously
+       ForwardAsync(ctx context.Context, stateMachineInstId string, 
replaceParams map[string]interface{}, callback CallBack) 
(statelang.StateMachineInstance, error)
+       // Compensate compensate a state machine instance
+       Compensate(ctx context.Context, stateMachineInstId string, 
replaceParams map[string]interface{}) (statelang.StateMachineInstance, error)
+       // CompensateAsync compensate a state machine instance asynchronously
+       CompensateAsync(ctx context.Context, stateMachineInstId string, 
replaceParams map[string]interface{}, callback CallBack) 
(statelang.StateMachineInstance, error)
+       // SkipAndForward skips the current failed state instance and restarts 
the state machine instance
+       SkipAndForward(ctx context.Context, stateMachineInstId string, 
replaceParams map[string]interface{}) (statelang.StateMachineInstance, error)
+       // SkipAndForwardAsync skips the current failed state instance and 
restarts the state machine instance asynchronously
+       SkipAndForwardAsync(ctx context.Context, stateMachineInstId string, 
callback CallBack) (statelang.StateMachineInstance, error)
+       // GetStateMachineConfig gets the state machine configurations
+       GetStateMachineConfig() StateMachineConfig
+       // ReloadStateMachineInstance reloads a state machine instance
+       ReloadStateMachineInstance(ctx context.Context, instId string) 
(statelang.StateMachineInstance, error)
 }
 
 type CallBack interface {
diff --git a/pkg/saga/statemachine/engine/core/utils.go 
b/pkg/saga/statemachine/engine/core/utils.go
index 49f03e00..afa1fd66 100644
--- a/pkg/saga/statemachine/engine/core/utils.go
+++ b/pkg/saga/statemachine/engine/core/utils.go
@@ -98,6 +98,14 @@ func (p *ProcessContextBuilder) WithIsAsyncExecution(async 
bool) *ProcessContext
        return p
 }
 
+func (p *ProcessContextBuilder) WithStateInstance(state 
statelang.StateInstance) *ProcessContextBuilder {
+       if state != nil {
+               p.processContext.SetVariable(constant.VarNameStateInst, state)
+       }
+
+       return p
+}
+
 func (p *ProcessContextBuilder) Build() ProcessContext {
        return p.processContext
 }
diff --git a/pkg/util/errors/code.go b/pkg/util/errors/code.go
index 9b15e72a..5d265df7 100644
--- a/pkg/util/errors/code.go
+++ b/pkg/util/errors/code.go
@@ -111,4 +111,6 @@ const (
        InvalidParameter
        // OperationDenied Operation denied
        OperationDenied
+       // ForwardInvalid Forward invalid
+       ForwardInvalid
 )


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to