This is an automated email from the ASF dual-hosted git repository.

yixia pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/saga by this push:
     new 8d68ae44 feature: sequential execution of state machine in Saga (#681)
8d68ae44 is described below

commit 8d68ae4494db947daa06cc503f8d532bc9e2c908
Author: A Cabbage <[email protected]>
AuthorDate: Thu Jun 27 16:44:28 2024 +0800

    feature: sequential execution of state machine in Saga (#681)
---
 go.mod                                             |   1 +
 go.sum                                             |   2 +
 pkg/saga/statemachine/constant/constant.go         |  93 +++--
 .../{process_ctrl => core}/bussiness_processor.go  |  27 +-
 .../engine/core/compensation_holder.go             |  63 +++
 .../engine/core/default_statemachine_config.go     | 199 ++++++++++
 pkg/saga/statemachine/engine/core/engine_utils.go  | 149 ++++++++
 .../statemachine/engine/{events => core}/event.go  |   2 +-
 pkg/saga/statemachine/engine/core/event_bus.go     | 113 ++++++
 .../engine/{events => core}/event_consumer.go      |  15 +-
 .../engine/{events => core}/event_publisher.go     |   2 +-
 pkg/saga/statemachine/engine/core/instruction.go   |  96 +++++
 .../engine/core/loop_context_holder.go             |  92 +++++
 .../statemachine/engine/core/loop_task_utils.go    |  40 ++
 .../statemachine/engine/core/parameter_utils.go    | 132 +++++++
 .../{process_ctrl => core}/process_context.go      |   2 +-
 .../statemachine/engine/core/process_controller.go |  31 ++
 .../core/process_ctrl_statemachine_engine.go       | 424 +++++++++++++++++++++
 .../statemachine/engine/core/process_router.go     | 194 ++++++++++
 .../process_state.go}                              |  51 +--
 pkg/saga/statemachine/engine/core/state_router.go  | 151 ++++++++
 .../engine/{ => core}/statemachine_config.go       |  22 +-
 .../engine/core/statemachine_engine.go             |  16 +
 .../engine/core/statemachine_engine_test.go        |  15 +
 .../engine/{store => core}/statemachine_store.go   |  15 +-
 .../statemachine/engine/core/status_decision.go    | 188 +++++++++
 pkg/saga/statemachine/engine/{ => core}/utils.go   |  14 +-
 .../engine/default_statemachine_config.go          | 123 ------
 pkg/saga/statemachine/engine/events/event_bus.go   |  49 ---
 .../statemachine/engine/exception/exception.go     |  54 +++
 pkg/saga/statemachine/engine/expr/expression.go    |  83 ++++
 .../engine/process_ctrl/instruction.go             |  24 --
 .../engine/process_ctrl_statemachine_engine.go     | 125 ------
 .../statemachine/engine/statemachine_engine.go     |  16 -
 .../engine/statemachine_engine_test.go             |   7 -
 .../engine/status_decision/status_decision.go      |   4 -
 .../handlers/service_task_state_handler.go         | 174 +++++++++
 .../process}/process_type.go                       |   2 +-
 .../statelang/parser/sub_state_machine_parser.go   |   2 +-
 .../statelang/state/loop_start_state.go            |  22 ++
 .../statelang/state/sub_state_machine.go           |   2 +-
 .../statemachine/statelang/state/task_state.go     |  20 +-
 .../statelang/statemachine_instance.go             |  20 +-
 pkg/saga/statemachine/{engine => }/store/db/db.go  |   0
 .../statemachine/{engine => }/store/db/db_test.go  |   0
 .../{engine => }/store/db/statelang.go             |   0
 .../{engine => }/store/db/statelang_test.go        |   0
 .../statemachine/{engine => }/store/db/statelog.go |  22 +-
 .../{engine => }/store/db/statelog_test.go         |  24 +-
 .../store/repository/state_machine_repository.go   |  34 ++
 pkg/tm/transaction_executor_test.go                |   2 +-
 pkg/util/collection/collection.go                  |  44 ++-
 pkg/util/collection/collection_test.go             |  23 ++
 pkg/util/errors/code.go                            |  11 +
 54 files changed, 2533 insertions(+), 503 deletions(-)

diff --git a/go.mod b/go.mod
index d24d93c3..c927882f 100644
--- a/go.mod
+++ b/go.mod
@@ -34,6 +34,7 @@ require (
        github.com/agiledragon/gomonkey v2.0.2+incompatible
        github.com/agiledragon/gomonkey/v2 v2.9.0
        github.com/mattn/go-sqlite3 v1.14.19
+       golang.org/x/sync v0.6.0
        google.golang.org/protobuf v1.30.0
 )
 
diff --git a/go.sum b/go.sum
index 9c0fbc0b..e269c87d 100644
--- a/go.sum
+++ b/go.sum
@@ -942,6 +942,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod 
h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
+golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/pkg/saga/statemachine/constant/constant.go 
b/pkg/saga/statemachine/constant/constant.go
index ef332c18..a089e7d5 100644
--- a/pkg/saga/statemachine/constant/constant.go
+++ b/pkg/saga/statemachine/constant/constant.go
@@ -1,29 +1,72 @@
 package constant
 
 const (
-       VarNameProcessType                  string = "_ProcessType_"
-       VarNameOperationName                string = "_operation_name_"
-       OperationNameStart                  string = "start"
-       VarNameAsyncCallback                string = "_async_callback_"
-       VarNameStateMachineInst             string = 
"_current_statemachine_instance_"
-       VarNameStateMachine                 string = "_current_statemachine_"
-       VarNameStateMachineEngine           string = 
"_current_statemachine_engine_"
-       VarNameStateMachineConfig           string = "_statemachine_config_"
-       VarNameStateMachineContext          string = "context"
-       VarNameIsAsyncExecution             string = "_is_async_execution_"
-       VarNameStateInst                    string = "_current_state_instance_"
-       SeqEntityStateMachineInst           string = "STATE_MACHINE_INST"
-       SeqEntityStateInst                  string = "STATE_INST"
-       VarNameBusinesskey                  string = "_business_key_"
-       VarNameParentId                     string = "_parent_id_"
-       StateTypeServiceTask                string = "ServiceTask"
-       StateTypeChoice                     string = "Choice"
-       StateTypeSubStateMachine            string = "SubStateMachine"
-       CompensateSubMachine                string = "CompensateSubMachine"
-       StateTypeSucceed                    string = "Succeed"
-       StateTypeFail                       string = "Fail"
-       StateTypeCompensationTrigger        string = "CompensationTrigger"
-       StateTypeScriptTask                 string = "ScriptTask"
-       CompensateSubMachineStateNamePrefix string = 
"_compensate_sub_machine_state_"
-       DefaultScriptType                   string = "groovy"
+       // region State Types
+       StateTypeServiceTask          string = "ServiceTask"
+       StateTypeChoice               string = "Choice"
+       StateTypeFail                 string = "Fail"
+       StateTypeSucceed              string = "Succeed"
+       StateTypeCompensationTrigger  string = "CompensationTrigger"
+       StateTypeSubStateMachine      string = "SubStateMachine"
+       StateTypeCompensateSubMachine string = "CompensateSubMachine"
+       StateTypeScriptTask           string = "ScriptTask"
+       StateTypeLoopStart            string = "LoopStart"
+       // end region
+
+       // region Service Types
+       ServiceTypeGRPC string = "GRPC"
+       // end region
+
+       // region System Variables
+       VarNameOutputParams                  string = "outputParams"
+       VarNameProcessType                   string = "_ProcessType_"
+       VarNameOperationName                 string = "_operation_name_"
+       OperationNameStart                   string = "start"
+       OperationNameCompensate              string = "compensate"
+       VarNameAsyncCallback                 string = "_async_callback_"
+       VarNameCurrentExceptionRoute         string = 
"_current_exception_route_"
+       VarNameIsExceptionNotCatch           string = "_is_exception_not_catch_"
+       VarNameSubMachineParentId            string = "_sub_machine_parent_id_"
+       VarNameCurrentChoice                 string = "_current_choice_"
+       VarNameStateMachineInst              string = 
"_current_statemachine_instance_"
+       VarNameStateMachine                  string = "_current_statemachine_"
+       VarNameStateMachineEngine            string = 
"_current_statemachine_engine_"
+       VarNameStateMachineConfig            string = "_statemachine_config_"
+       VarNameStateMachineContext           string = "context"
+       VarNameIsAsyncExecution              string = "_is_async_execution_"
+       VarNameStateInst                     string = "_current_state_instance_"
+       VarNameBusinesskey                   string = "_business_key_"
+       VarNameParentId                      string = "_parent_id_"
+       VarNameCurrentException              string = "currentException"
+       CompensateSubMachineStateNamePrefix  string = 
"_compensate_sub_machine_state_"
+       DefaultScriptType                    string = "groovy"
+       VarNameSyncExeStack                  string = "_sync_execution_stack_"
+       VarNameInputParams                   string = "inputParams"
+       VarNameIsLoopState                   string = "_is_loop_state_"
+       VarNameCurrentCompensateTriggerState string = "_is_compensating_"
+       VarNameCurrentCompensationHolder     string = 
"_current_compensation_holder_"
+       VarNameFirstCompensationStateStarted string = 
"_first_compensation_state_started"
+       VarNameCurrentLoopContextHolder      string = 
"_current_loop_context_holder_"
+       // TODO: this lock in process context only has one, try to add more to 
add concurrent
+       VarNameProcessContextMutexLock string = "_current_context_mutex_lock"
+       VarNameFailEndStateFlag        string = "_fail_end_state_flag_"
+       // end region
+
+       // region of loop
+       LoopCounter                string = "loopCounter"
+       LoopSemaphore              string = "loopSemaphore"
+       LoopResult                 string = "loopResult"
+       NumberOfInstances          string = "nrOfInstances"
+       NumberOfActiveInstances    string = "nrOfActiveInstances"
+       NumberOfCompletedInstances string = "nrOfCompletedInstances"
+       // end region
+
+       // region others
+       SeqEntityStateMachineInst string = "STATE_MACHINE_INST"
+       SeqEntityStateInst        string = "STATE_INST"
+       OperationNameForward      string = "forward"
+       LoopStateNamePattern      string = "-loop-"
+       // end region
+
+       SeperatorParentId string = ":"
 )
diff --git a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go 
b/pkg/saga/statemachine/engine/core/bussiness_processor.go
similarity index 79%
rename from pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
rename to pkg/saga/statemachine/engine/core/bussiness_processor.go
index c12a7c79..6b0a2bb4 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
+++ b/pkg/saga/statemachine/engine/core/bussiness_processor.go
@@ -1,9 +1,10 @@
-package process_ctrl
+package core
 
 import (
        "context"
        "github.com/pkg/errors"
        "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
        "sync"
 )
 
@@ -19,14 +20,14 @@ type DefaultBusinessProcessor struct {
        mu              sync.RWMutex
 }
 
-func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType 
ProcessType, processHandler ProcessHandler) {
+func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType 
process.ProcessType, processHandler ProcessHandler) {
        d.mu.Lock()
        defer d.mu.Unlock()
 
        d.processHandlers[string(processType)] = processHandler
 }
 
-func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType 
ProcessType, routerHandler RouterHandler) {
+func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType 
process.ProcessType, routerHandler RouterHandler) {
        d.mu.Lock()
        defer d.mu.Unlock()
 
@@ -55,17 +56,17 @@ func (d *DefaultBusinessProcessor) Route(ctx 
context.Context, processContext Pro
        return routerHandler.Route(ctx, processContext)
 }
 
-func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType) 
(ProcessHandler, error) {
+func (d *DefaultBusinessProcessor) getProcessHandler(processType 
process.ProcessType) (ProcessHandler, error) {
        d.mu.RLock()
        defer d.mu.RUnlock()
        processHandler, ok := d.processHandlers[string(processType)]
        if !ok {
-               return nil, errors.New("Cannot find process handler by type " + 
string(processType))
+               return nil, errors.New("Cannot find Process handler by type " + 
string(processType))
        }
        return processHandler, nil
 }
 
-func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) 
(RouterHandler, error) {
+func (d *DefaultBusinessProcessor) getRouterHandler(processType 
process.ProcessType) (RouterHandler, error) {
        d.mu.RLock()
        defer d.mu.RUnlock()
        routerHandler, ok := d.routerHandlers[string(processType)]
@@ -75,18 +76,10 @@ func (d *DefaultBusinessProcessor) 
getRouterHandler(processType ProcessType) (Ro
        return routerHandler, nil
 }
 
-func (d *DefaultBusinessProcessor) matchProcessType(processContext 
ProcessContext) ProcessType {
+func (d *DefaultBusinessProcessor) matchProcessType(processContext 
ProcessContext) process.ProcessType {
        ok := processContext.HasVariable(constant.VarNameProcessType)
        if ok {
-               return 
processContext.GetVariable(constant.VarNameProcessType).(ProcessType)
+               return 
processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType)
        }
-       return StateLang
-}
-
-type ProcessHandler interface {
-       Process(ctx context.Context, processContext ProcessContext) error
-}
-
-type RouterHandler interface {
-       Route(ctx context.Context, processContext ProcessContext) error
+       return process.StateLang
 }
diff --git a/pkg/saga/statemachine/engine/core/compensation_holder.go 
b/pkg/saga/statemachine/engine/core/compensation_holder.go
new file mode 100644
index 00000000..562cb020
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/compensation_holder.go
@@ -0,0 +1,63 @@
+package core
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/util/collection"
+       "sync"
+)
+
+type CompensationHolder struct {
+       statesNeedCompensation     *sync.Map
+       statesForCompensation      *sync.Map
+       stateStackNeedCompensation *collection.Stack
+}
+
+func (c *CompensationHolder) StatesNeedCompensation() *sync.Map {
+       return c.statesNeedCompensation
+}
+
+func (c *CompensationHolder) SetStatesNeedCompensation(statesNeedCompensation 
*sync.Map) {
+       c.statesNeedCompensation = statesNeedCompensation
+}
+
+func (c *CompensationHolder) StatesForCompensation() *sync.Map {
+       return c.statesForCompensation
+}
+
+func (c *CompensationHolder) SetStatesForCompensation(statesForCompensation 
*sync.Map) {
+       c.statesForCompensation = statesForCompensation
+}
+
+func (c *CompensationHolder) StateStackNeedCompensation() *collection.Stack {
+       return c.stateStackNeedCompensation
+}
+
+func (c *CompensationHolder) 
SetStateStackNeedCompensation(stateStackNeedCompensation *collection.Stack) {
+       c.stateStackNeedCompensation = stateStackNeedCompensation
+}
+
+func (c *CompensationHolder) AddToBeCompensatedState(stateName string, 
toBeCompensatedState statelang.StateInstance) {
+       c.statesNeedCompensation.Store(stateName, toBeCompensatedState)
+}
+
+func NewCompensationHolder() *CompensationHolder {
+       return &CompensationHolder{
+               statesNeedCompensation:     &sync.Map{},
+               statesForCompensation:      &sync.Map{},
+               stateStackNeedCompensation: collection.NewStack(),
+       }
+}
+
+func GetCurrentCompensationHolder(ctx context.Context, processContext 
ProcessContext, forceCreate bool) *CompensationHolder {
+       compensationholder := 
processContext.GetVariable(constant.VarNameCurrentCompensationHolder).(*CompensationHolder)
+       lock := 
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+       lock.Lock()
+       defer lock.Unlock()
+       if compensationholder == nil && forceCreate {
+               compensationholder = NewCompensationHolder()
+               
processContext.SetVariable(constant.VarNameCurrentCompensationHolder, 
compensationholder)
+       }
+       return compensationholder
+}
diff --git a/pkg/saga/statemachine/engine/core/default_statemachine_config.go 
b/pkg/saga/statemachine/engine/core/default_statemachine_config.go
new file mode 100644
index 00000000..c7692696
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/default_statemachine_config.go
@@ -0,0 +1,199 @@
+package core
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
+       "sync"
+)
+
+const (
+       DefaultTransOperTimeout     = 60000 * 30
+       DefaultServiceInvokeTimeout = 60000 * 5
+)
+
+type DefaultStateMachineConfig struct {
+       // Configuration
+       transOperationTimeout int
+       serviceInvokeTimeout  int
+       charset               string
+       defaultTenantId       string
+
+       // Components
+
+       // Event publisher
+       syncProcessCtrlEventPublisher  EventPublisher
+       asyncProcessCtrlEventPublisher EventPublisher
+
+       // Store related components
+       stateLogRepository     StateLogRepository
+       stateLogStore          StateLogStore
+       stateLangStore         StateLangStore
+       stateMachineRepository StateMachineRepository
+
+       // Expression related components
+       expressionFactoryManager expr.ExpressionFactoryManager
+       expressionResolver       expr.ExpressionResolver
+
+       // Invoker related components
+       serviceInvokerManager invoker.ServiceInvokerManager
+       scriptInvokerManager  invoker.ScriptInvokerManager
+
+       // Other components
+       statusDecisionStrategy StatusDecisionStrategy
+       seqGenerator           sequence.SeqGenerator
+       componentLock          *sync.Mutex
+}
+
+func (c *DefaultStateMachineConfig) ComponentLock() *sync.Mutex {
+       return c.componentLock
+}
+
+func (c *DefaultStateMachineConfig) SetComponentLock(componentLock 
*sync.Mutex) {
+       c.componentLock = componentLock
+}
+
+func (c *DefaultStateMachineConfig) 
SetTransOperationTimeout(transOperationTimeout int) {
+       c.transOperationTimeout = transOperationTimeout
+}
+
+func (c *DefaultStateMachineConfig) 
SetServiceInvokeTimeout(serviceInvokeTimeout int) {
+       c.serviceInvokeTimeout = serviceInvokeTimeout
+}
+
+func (c *DefaultStateMachineConfig) SetCharset(charset string) {
+       c.charset = charset
+}
+
+func (c *DefaultStateMachineConfig) SetDefaultTenantId(defaultTenantId string) 
{
+       c.defaultTenantId = defaultTenantId
+}
+
+func (c *DefaultStateMachineConfig) 
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher EventPublisher) {
+       c.syncProcessCtrlEventPublisher = syncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig) 
SetAsyncProcessCtrlEventPublisher(asyncProcessCtrlEventPublisher 
EventPublisher) {
+       c.asyncProcessCtrlEventPublisher = asyncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig) SetStateLogRepository(stateLogRepository 
StateLogRepository) {
+       c.stateLogRepository = stateLogRepository
+}
+
+func (c *DefaultStateMachineConfig) SetStateLogStore(stateLogStore 
StateLogStore) {
+       c.stateLogStore = stateLogStore
+}
+
+func (c *DefaultStateMachineConfig) SetStateLangStore(stateLangStore 
StateLangStore) {
+       c.stateLangStore = stateLangStore
+}
+
+func (c *DefaultStateMachineConfig) 
SetStateMachineRepository(stateMachineRepository StateMachineRepository) {
+       c.stateMachineRepository = stateMachineRepository
+}
+
+func (c *DefaultStateMachineConfig) 
SetExpressionFactoryManager(expressionFactoryManager 
expr.ExpressionFactoryManager) {
+       c.expressionFactoryManager = expressionFactoryManager
+}
+
+func (c *DefaultStateMachineConfig) SetExpressionResolver(expressionResolver 
expr.ExpressionResolver) {
+       c.expressionResolver = expressionResolver
+}
+
+func (c *DefaultStateMachineConfig) 
SetServiceInvokerManager(serviceInvokerManager invoker.ServiceInvokerManager) {
+       c.serviceInvokerManager = serviceInvokerManager
+}
+
+func (c *DefaultStateMachineConfig) 
SetScriptInvokerManager(scriptInvokerManager invoker.ScriptInvokerManager) {
+       c.scriptInvokerManager = scriptInvokerManager
+}
+
+func (c *DefaultStateMachineConfig) 
SetStatusDecisionStrategy(statusDecisionStrategy StatusDecisionStrategy) {
+       c.statusDecisionStrategy = statusDecisionStrategy
+}
+
+func (c *DefaultStateMachineConfig) SetSeqGenerator(seqGenerator 
sequence.SeqGenerator) {
+       c.seqGenerator = seqGenerator
+}
+
+func (c *DefaultStateMachineConfig) StateLogRepository() StateLogRepository {
+       return c.stateLogRepository
+}
+
+func (c *DefaultStateMachineConfig) StateMachineRepository() 
StateMachineRepository {
+       return c.stateMachineRepository
+}
+
+func (c *DefaultStateMachineConfig) StateLogStore() StateLogStore {
+       return c.stateLogStore
+}
+
+func (c *DefaultStateMachineConfig) StateLangStore() StateLangStore {
+       return c.stateLangStore
+}
+
+func (c *DefaultStateMachineConfig) ExpressionFactoryManager() 
expr.ExpressionFactoryManager {
+       return c.expressionFactoryManager
+}
+
+func (c *DefaultStateMachineConfig) ExpressionResolver() 
expr.ExpressionResolver {
+       return c.expressionResolver
+}
+
+func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator {
+       return c.seqGenerator
+}
+
+func (c *DefaultStateMachineConfig) StatusDecisionStrategy() 
StatusDecisionStrategy {
+       return c.statusDecisionStrategy
+}
+
+func (c *DefaultStateMachineConfig) EventPublisher() EventPublisher {
+       return c.syncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig) AsyncEventPublisher() EventPublisher {
+       return c.asyncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig) ServiceInvokerManager() 
invoker.ServiceInvokerManager {
+       return c.serviceInvokerManager
+}
+
+func (c *DefaultStateMachineConfig) ScriptInvokerManager() 
invoker.ScriptInvokerManager {
+       return c.scriptInvokerManager
+}
+
+func (c *DefaultStateMachineConfig) CharSet() string {
+       return c.charset
+}
+
+func (c *DefaultStateMachineConfig) SetCharSet(charset string) {
+       c.charset = charset
+}
+
+func (c *DefaultStateMachineConfig) DefaultTenantId() string {
+       return c.defaultTenantId
+}
+
+func (c *DefaultStateMachineConfig) TransOperationTimeout() int {
+       return c.transOperationTimeout
+}
+
+func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int {
+       return c.serviceInvokeTimeout
+}
+
+func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
+       c := &DefaultStateMachineConfig{
+               transOperationTimeout: DefaultTransOperTimeout,
+               serviceInvokeTimeout:  DefaultServiceInvokeTimeout,
+               charset:               "UTF-8",
+               defaultTenantId:       "000001",
+               componentLock:         &sync.Mutex{},
+       }
+
+       // TODO: init config
+       return c
+}
diff --git a/pkg/saga/statemachine/engine/core/engine_utils.go 
b/pkg/saga/statemachine/engine/core/engine_utils.go
new file mode 100644
index 00000000..e1dcd4de
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/engine_utils.go
@@ -0,0 +1,149 @@
+package core
+
+import (
+       "context"
+       "errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       "github.com/seata/seata-go/pkg/util/log"
+       "golang.org/x/sync/semaphore"
+       "reflect"
+       "strings"
+       "sync"
+       "time"
+)
+
+func EndStateMachine(ctx context.Context, processContext ProcessContext) error 
{
+       if processContext.HasVariable(constant.VarNameIsLoopState) {
+               if processContext.HasVariable(constant.LoopSemaphore) {
+                       weighted, ok := 
processContext.GetVariable(constant.LoopSemaphore).(semaphore.Weighted)
+                       if !ok {
+                               return errors.New("semaphore type is not 
weighted")
+                       }
+                       weighted.Release(1)
+               }
+       }
+
+       stateMachineInstance, ok := 
processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
+       if !ok {
+               return errors.New("state machine instance type is not 
statelang.StateMachineInstance")
+       }
+
+       stateMachineInstance.SetEndTime(time.Now())
+
+       exp, ok := 
processContext.GetVariable(constant.VarNameCurrentException).(error)
+       if !ok {
+               return errors.New("exception type is not error")
+       }
+
+       if exp != nil {
+               stateMachineInstance.SetException(exp)
+               log.Debugf("Exception Occurred: %s", exp)
+       }
+
+       stateMachineConfig, ok := 
processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+
+       if err := 
stateMachineConfig.StatusDecisionStrategy().DecideOnEndState(ctx, 
processContext, stateMachineInstance, exp); err != nil {
+               return err
+       }
+
+       contextParams, ok := 
processContext.GetVariable(constant.VarNameStateMachineContext).(map[string]interface{})
+       if !ok {
+               return errors.New("state machine context type is not 
map[string]interface{}")
+       }
+       endParams := stateMachineInstance.EndParams()
+       for k, v := range contextParams {
+               endParams[k] = v
+       }
+       stateMachineInstance.SetEndParams(endParams)
+
+       stateInstruction, ok := 
processContext.GetInstruction().(StateInstruction)
+       if !ok {
+               return errors.New("state instruction type is not 
process_ctrl.StateInstruction")
+       }
+       stateInstruction.SetEnd(true)
+
+       stateMachineInstance.SetRunning(false)
+       stateMachineInstance.SetEndTime(time.Now())
+
+       if stateMachineInstance.StateMachine().IsPersist() && 
stateMachineConfig.StateLangStore() != nil {
+               err := 
stateMachineConfig.StateLogStore().RecordStateMachineFinished(ctx, 
stateMachineInstance, processContext)
+               if err != nil {
+                       return err
+               }
+       }
+
+       callBack, ok := 
processContext.GetVariable(constant.VarNameAsyncCallback).(CallBack)
+       if ok {
+               if exp != nil {
+                       callBack.OnError(ctx, processContext, 
stateMachineInstance, exp)
+               } else {
+                       callBack.OnFinished(ctx, processContext, 
stateMachineInstance)
+               }
+       }
+
+       return nil
+}
+
+func HandleException(processContext ProcessContext, abstractTaskState 
*state.AbstractTaskState, err error) {
+       catches := abstractTaskState.Catches()
+       if catches != nil && len(catches) != 0 {
+               for _, exceptionMatch := range catches {
+                       exceptions := exceptionMatch.Exceptions()
+                       exceptionTypes := exceptionMatch.ExceptionTypes()
+                       if exceptions != nil && len(exceptions) != 0 {
+                               if exceptionTypes == nil {
+                                       lock := 
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+                                       lock.Lock()
+                                       defer lock.Unlock()
+                                       error := errors.New("")
+                                       for i := 0; i < len(exceptions); i++ {
+                                               exceptionTypes = 
append(exceptionTypes, reflect.TypeOf(error))
+                                       }
+                               }
+
+                               exceptionMatch.SetExceptionTypes(exceptionTypes)
+                       }
+
+                       for i, _ := range exceptionTypes {
+                               if reflect.TypeOf(err) == exceptionTypes[i] {
+                                       // HACK: we can not get error type in 
config file during runtime, so we use exception str
+                                       if strings.Contains(err.Error(), 
exceptions[i]) {
+                                               hierarchicalProcessContext := 
processContext.(HierarchicalProcessContext)
+                                               
hierarchicalProcessContext.SetVariable(constant.VarNameCurrentExceptionRoute, 
exceptionMatch.Next())
+                                               return
+                                       }
+                               }
+                       }
+               }
+       }
+
+       log.Error("Task execution failed and no catches configured")
+       hierarchicalProcessContext := 
processContext.(HierarchicalProcessContext)
+       
hierarchicalProcessContext.SetVariable(constant.VarNameIsExceptionNotCatch, 
true)
+}
+
+// GetOriginStateName get origin state name without suffix like fork
+func GetOriginStateName(stateInstance statelang.StateInstance) string {
+       stateName := stateInstance.Name()
+       if stateName != "" {
+               end := strings.LastIndex(stateName, 
constant.LoopStateNamePattern)
+               if end > -1 {
+                       return stateName[:end+1]
+               }
+       }
+       return stateName
+}
+
+// IsTimeout test if is timeout
+func IsTimeout(gmtUpdated time.Time, timeoutMillis int) bool {
+       if timeoutMillis < 0 {
+               return false
+       }
+       return time.Now().Unix()-gmtUpdated.Unix() > int64(timeoutMillis)
+}
+
+func GenerateParentId(stateInstance statelang.StateInstance) string {
+       return stateInstance.MachineInstanceID() + constant.SeperatorParentId + 
stateInstance.ID()
+}
diff --git a/pkg/saga/statemachine/engine/events/event.go 
b/pkg/saga/statemachine/engine/core/event.go
similarity index 63%
rename from pkg/saga/statemachine/engine/events/event.go
rename to pkg/saga/statemachine/engine/core/event.go
index 8f142713..4f7c18b2 100644
--- a/pkg/saga/statemachine/engine/events/event.go
+++ b/pkg/saga/statemachine/engine/core/event.go
@@ -1,4 +1,4 @@
-package events
+package core
 
 type Event interface {
 }
diff --git a/pkg/saga/statemachine/engine/core/event_bus.go 
b/pkg/saga/statemachine/engine/core/event_bus.go
new file mode 100644
index 00000000..47d617d4
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/event_bus.go
@@ -0,0 +1,113 @@
+package core
+
+import (
+       "context"
+       "fmt"
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/util/collection"
+       "github.com/seata/seata-go/pkg/util/log"
+)
+
+type EventBus interface {
+       Offer(ctx context.Context, event Event) (bool, error)
+
+       EventConsumerList(event Event) []EventConsumer
+
+       RegisterEventConsumer(consumer EventConsumer)
+}
+
+type BaseEventBus struct {
+       eventConsumerList []EventConsumer
+}
+
+func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
+       if b.eventConsumerList == nil {
+               b.eventConsumerList = make([]EventConsumer, 0)
+       }
+       b.eventConsumerList = append(b.eventConsumerList, consumer)
+}
+
+func (b *BaseEventBus) EventConsumerList(event Event) []EventConsumer {
+       var acceptedConsumerList = make([]EventConsumer, 0)
+       for i := range b.eventConsumerList {
+               eventConsumer := b.eventConsumerList[i]
+               if eventConsumer.Accept(event) {
+                       acceptedConsumerList = append(acceptedConsumerList, 
eventConsumer)
+               }
+       }
+       return acceptedConsumerList
+}
+
+type DirectEventBus struct {
+       BaseEventBus
+}
+
+func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
+       eventConsumerList := d.EventConsumerList(event)
+       if len(eventConsumerList) == 0 {
+               log.Debugf("cannot find event handler by type: %T", event)
+               return false, nil
+       }
+
+       isFirstEvent := true
+       processContext, ok := event.(ProcessContext)
+       if !ok {
+               log.Errorf("event %T is illegal, required 
process_ctrl.ProcessContext", event)
+               return false, nil
+       }
+
+       stack := 
processContext.GetVariable(constant.VarNameSyncExeStack).(*collection.Stack)
+       if stack == nil {
+               stack = collection.NewStack()
+               processContext.SetVariable(constant.VarNameSyncExeStack, stack)
+               isFirstEvent = true
+       }
+
+       stack.Push(processContext)
+       if isFirstEvent {
+               for stack.Len() > 0 {
+                       currentContext := stack.Pop().(ProcessContext)
+                       for _, eventConsumer := range eventConsumerList {
+                               err := eventConsumer.Process(ctx, 
currentContext)
+                               if err != nil {
+                                       log.Errorf("process event %T error: 
%s", event, err.Error())
+                                       return false, err
+                               }
+                       }
+               }
+       }
+
+       return true, nil
+}
+
+type AsyncEventBus struct {
+       BaseEventBus
+}
+
+func (a AsyncEventBus) Offer(ctx context.Context, event Event) (bool, error) {
+       eventConsumerList := a.EventConsumerList(event)
+       if len(eventConsumerList) == 0 {
+               errStr := fmt.Sprintf("cannot find event handler by type: %T", 
event)
+               log.Errorf(errStr)
+               return false, errors.New(errStr)
+       }
+
+       processContext, ok := event.(ProcessContext)
+       if !ok {
+               errStr := fmt.Sprintf("event %T is illegal, required 
process_ctrl.ProcessContext", event)
+               log.Errorf(errStr)
+               return false, errors.New(errStr)
+       }
+
+       for _, eventConsumer := range eventConsumerList {
+               go func() {
+                       err := eventConsumer.Process(ctx, processContext)
+                       if err != nil {
+                               log.Errorf("process event %T error: %s", event, 
err.Error())
+                       }
+               }()
+       }
+
+       return true, nil
+}
diff --git a/pkg/saga/statemachine/engine/events/event_consumer.go 
b/pkg/saga/statemachine/engine/core/event_consumer.go
similarity index 53%
rename from pkg/saga/statemachine/engine/events/event_consumer.go
rename to pkg/saga/statemachine/engine/core/event_consumer.go
index 426da15e..1df21bdb 100644
--- a/pkg/saga/statemachine/engine/events/event_consumer.go
+++ b/pkg/saga/statemachine/engine/core/event_consumer.go
@@ -1,8 +1,9 @@
-package events
+package core
 
 import (
        "context"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       "fmt"
+       "github.com/pkg/errors"
 )
 
 type EventConsumer interface {
@@ -12,6 +13,7 @@ type EventConsumer interface {
 }
 
 type ProcessCtrlEventConsumer struct {
+       processController ProcessController
 }
 
 func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
@@ -19,11 +21,14 @@ func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
                return false
        }
 
-       _, ok := event.(process_ctrl.ProcessContext)
+       _, ok := event.(ProcessContext)
        return ok
 }
 
 func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event) 
error {
-       //TODO implement me
-       panic("implement me")
+       processContext, ok := event.(ProcessContext)
+       if !ok {
+               return errors.New(fmt.Sprint("event %T is illegal, required 
process_ctrl.ProcessContext", event))
+       }
+       return p.processController.Process(ctx, processContext)
 }
diff --git a/pkg/saga/statemachine/engine/events/event_publisher.go 
b/pkg/saga/statemachine/engine/core/event_publisher.go
similarity index 96%
rename from pkg/saga/statemachine/engine/events/event_publisher.go
rename to pkg/saga/statemachine/engine/core/event_publisher.go
index 1fbfaec2..9ad86940 100644
--- a/pkg/saga/statemachine/engine/events/event_publisher.go
+++ b/pkg/saga/statemachine/engine/core/event_publisher.go
@@ -1,4 +1,4 @@
-package events
+package core
 
 import "context"
 
diff --git a/pkg/saga/statemachine/engine/core/instruction.go 
b/pkg/saga/statemachine/engine/core/instruction.go
new file mode 100644
index 00000000..ed81fc9c
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/instruction.go
@@ -0,0 +1,96 @@
+package core
+
+import (
+       "errors"
+       "fmt"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type Instruction interface {
+}
+
+type StateInstruction struct {
+       stateName        string
+       stateMachineName string
+       tenantId         string
+       end              bool
+       temporaryState   statelang.State
+}
+
+func NewStateInstruction(stateMachineName string, tenantId string) 
*StateInstruction {
+       return &StateInstruction{stateMachineName: stateMachineName, tenantId: 
tenantId}
+}
+
+func (s *StateInstruction) StateName() string {
+       return s.stateName
+}
+
+func (s *StateInstruction) SetStateName(stateName string) {
+       s.stateName = stateName
+}
+
+func (s *StateInstruction) StateMachineName() string {
+       return s.stateMachineName
+}
+
+func (s *StateInstruction) SetStateMachineName(stateMachineName string) {
+       s.stateMachineName = stateMachineName
+}
+
+func (s *StateInstruction) TenantId() string {
+       return s.tenantId
+}
+
+func (s *StateInstruction) SetTenantId(tenantId string) {
+       s.tenantId = tenantId
+}
+
+func (s *StateInstruction) End() bool {
+       return s.end
+}
+
+func (s *StateInstruction) SetEnd(end bool) {
+       s.end = end
+}
+
+func (s *StateInstruction) TemporaryState() statelang.State {
+       return s.temporaryState
+}
+
+func (s *StateInstruction) SetTemporaryState(temporaryState statelang.State) {
+       s.temporaryState = temporaryState
+}
+
+func (s *StateInstruction) GetState(context ProcessContext) (statelang.State, 
error) {
+       if s.temporaryState != nil {
+               return s.temporaryState, nil
+       }
+
+       if s.stateMachineName == "" {
+               return nil, errors.New("stateMachineName is required")
+       }
+
+       stateMachineConfig, ok := 
context.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+       if !ok {
+               return nil, errors.New("stateMachineConfig is required in 
context")
+       }
+       stateMachine, err := 
stateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(s.stateMachineName,
 s.tenantId)
+       if err != nil {
+               return nil, errors.New("get stateMachine in state machine 
repository error")
+       }
+       if stateMachine == nil {
+               return nil, errors.New(fmt.Sprintf("stateMachine [%s] is not 
exist", s.stateMachineName))
+       }
+
+       if s.stateName == "" {
+               s.stateName = stateMachine.StartState()
+       }
+
+       state := stateMachine.States()[s.stateName]
+       if state == nil {
+               return nil, errors.New(fmt.Sprintf("state [%s] is not exist", 
s.stateName))
+       }
+
+       return state, nil
+}
diff --git a/pkg/saga/statemachine/engine/core/loop_context_holder.go 
b/pkg/saga/statemachine/engine/core/loop_context_holder.go
new file mode 100644
index 00000000..b1f69fe8
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/loop_context_holder.go
@@ -0,0 +1,92 @@
+package core
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "sync"
+)
+
+type LoopContextHolder struct {
+       nrOfInstances                int32
+       nrOfActiveInstances          int32
+       nrOfCompletedInstances       int32
+       failEnd                      bool
+       completionConditionSatisfied bool
+       loopCounterStack             []int
+       forwardCounterStack          []int
+       collection                   interface{}
+}
+
+func NewLoopContextHolder() *LoopContextHolder {
+       return &LoopContextHolder{
+               nrOfInstances:                0,
+               nrOfActiveInstances:          0,
+               nrOfCompletedInstances:       0,
+               failEnd:                      false,
+               completionConditionSatisfied: false,
+               loopCounterStack:             make([]int, 0),
+               forwardCounterStack:          make([]int, 0),
+               collection:                   nil,
+       }
+}
+
+func GetCurrentLoopContextHolder(ctx context.Context, processContext 
ProcessContext, forceCreate bool) *LoopContextHolder {
+       mutex := 
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+       mutex.Lock()
+       defer mutex.Unlock()
+
+       loopContextHolder := 
processContext.GetVariable(constant.VarNameCurrentLoopContextHolder).(*LoopContextHolder)
+       if loopContextHolder == nil && forceCreate {
+               loopContextHolder = &LoopContextHolder{}
+               
processContext.SetVariable(constant.VarNameCurrentLoopContextHolder, 
loopContextHolder)
+       }
+       return loopContextHolder
+}
+
+func ClearCurrent(ctx context.Context, processContext ProcessContext) {
+       processContext.RemoveVariable(constant.VarNameCurrentLoopContextHolder)
+}
+
+func (l *LoopContextHolder) NrOfInstances() int32 {
+       return l.nrOfInstances
+}
+
+func (l *LoopContextHolder) NrOfActiveInstances() int32 {
+       return l.nrOfActiveInstances
+}
+
+func (l *LoopContextHolder) NrOfCompletedInstances() int32 {
+       return l.nrOfCompletedInstances
+}
+
+func (l *LoopContextHolder) FailEnd() bool {
+       return l.failEnd
+}
+
+func (l *LoopContextHolder) SetFailEnd(failEnd bool) {
+       l.failEnd = failEnd
+}
+
+func (l *LoopContextHolder) CompletionConditionSatisfied() bool {
+       return l.completionConditionSatisfied
+}
+
+func (l *LoopContextHolder) 
SetCompletionConditionSatisfied(completionConditionSatisfied bool) {
+       l.completionConditionSatisfied = completionConditionSatisfied
+}
+
+func (l *LoopContextHolder) LoopCounterStack() []int {
+       return l.loopCounterStack
+}
+
+func (l *LoopContextHolder) ForwardCounterStack() []int {
+       return l.forwardCounterStack
+}
+
+func (l *LoopContextHolder) Collection() interface{} {
+       return l.collection
+}
+
+func (l *LoopContextHolder) SetCollection(collection interface{}) {
+       l.collection = collection
+}
diff --git a/pkg/saga/statemachine/engine/core/loop_task_utils.go 
b/pkg/saga/statemachine/engine/core/loop_task_utils.go
new file mode 100644
index 00000000..d56ac7f0
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/loop_task_utils.go
@@ -0,0 +1,40 @@
+package core
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       "github.com/seata/seata-go/pkg/util/log"
+)
+
+func GetLoopConfig(ctx context.Context, processContext ProcessContext, 
currentState statelang.State) state.Loop {
+       if matchLoop(currentState) {
+               taskState := currentState.(state.AbstractTaskState)
+               stateMachineInstance := 
processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
+               stateMachineConfig := 
processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+
+               if taskState.Loop() != nil {
+                       loop := taskState.Loop()
+                       collectionName := loop.Collection()
+                       if collectionName != "" {
+                               expression := 
CreateValueExpression(stateMachineConfig.ExpressionResolver(), collectionName)
+                               collection := GetValue(expression, 
stateMachineInstance.Context(), nil)
+                               collectionList := collection.([]any)
+                               if len(collectionList) > 0 {
+                                       current := 
GetCurrentLoopContextHolder(ctx, processContext, true)
+                                       current.SetCollection(collection)
+                                       return loop
+                               }
+                       }
+                       log.Warn("State [{}] loop collection param [{}] 
invalid", currentState.Name(), collectionName)
+               }
+
+       }
+       return nil
+}
+
+func matchLoop(currentState statelang.State) bool {
+       return currentState != nil && (constant.StateTypeServiceTask == 
currentState.Type() ||
+               constant.StateTypeScriptTask == currentState.Type() || 
constant.StateTypeSubStateMachine == currentState.Type())
+}
diff --git a/pkg/saga/statemachine/engine/core/parameter_utils.go 
b/pkg/saga/statemachine/engine/core/parameter_utils.go
new file mode 100644
index 00000000..680167cf
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/parameter_utils.go
@@ -0,0 +1,132 @@
+package core
+
+import (
+       "fmt"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       "strings"
+       "sync"
+)
+
+func CreateInputParams(processContext ProcessContext, expressionResolver 
expr.ExpressionResolver,
+       stateInstance *statelang.StateInstanceImpl, serviceTaskState 
*state.AbstractTaskState, variablesFrom any) []any {
+       inputAssignments := serviceTaskState.Input()
+       if inputAssignments == nil || len(inputAssignments) == 0 {
+               return inputAssignments
+       }
+
+       inputExpressions := serviceTaskState.InputExpressions()
+       if inputExpressions == nil || len(inputExpressions) == 0 {
+               lock := 
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+               lock.Lock()
+               defer lock.Unlock()
+               inputExpressions = serviceTaskState.InputExpressions()
+               if inputExpressions == nil || len(inputExpressions) == 0 {
+                       inputExpressions = make([]any, 0, len(inputAssignments))
+
+                       for _, assignment := range inputAssignments {
+                               inputExpressions = append(inputExpressions, 
CreateValueExpression(expressionResolver, assignment))
+                       }
+               }
+               serviceTaskState.SetInputExpressions(inputExpressions)
+       }
+       inputValues := make([]any, 0, len(inputExpressions))
+       for _, valueExpression := range inputExpressions {
+               value := GetValue(valueExpression, variablesFrom, stateInstance)
+               inputValues = append(inputValues, value)
+       }
+
+       return inputValues
+}
+
+func CreateOutputParams(config StateMachineConfig, expressionResolver 
expr.ExpressionResolver,
+       serviceTaskState *state.AbstractTaskState, variablesFrom any) 
(map[string]any, error) {
+       outputAssignments := serviceTaskState.Output()
+       if outputAssignments == nil || len(outputAssignments) == 0 {
+               return make(map[string]any, 0), nil
+       }
+
+       outputExpressions := serviceTaskState.OutputExpressions()
+       if outputExpressions == nil {
+               config.ComponentLock().Lock()
+               defer config.ComponentLock().Unlock()
+               outputExpressions = serviceTaskState.OutputExpressions()
+               if outputExpressions == nil {
+                       outputExpressions = make(map[string]any, 
len(outputAssignments))
+                       for key, value := range outputAssignments {
+                               outputExpressions[key] = 
CreateValueExpression(expressionResolver, value)
+                       }
+               }
+               serviceTaskState.SetOutputExpressions(outputExpressions)
+       }
+       outputValues := make(map[string]any, len(outputExpressions))
+       for paramName, _ := range outputExpressions {
+               outputValues[paramName] = 
GetValue(outputExpressions[paramName], variablesFrom, nil)
+       }
+       return outputValues, nil
+}
+
+func CreateValueExpression(expressionResolver expr.ExpressionResolver, 
paramAssignment any) any {
+       var valueExpression any
+
+       switch paramAssignment.(type) {
+       case expr.Expression:
+               valueExpression = paramAssignment
+       case map[string]any:
+               paramMapAssignment := paramAssignment.(map[string]any)
+               paramMap := make(map[string]any, len(paramMapAssignment))
+               for key, value := range paramMapAssignment {
+                       paramMap[key] = 
CreateValueExpression(expressionResolver, value)
+               }
+               valueExpression = paramMap
+       case []any:
+               paramListAssignment := paramAssignment.([]any)
+               paramList := make([]any, 0, len(paramListAssignment))
+               for _, value := range paramListAssignment {
+                       paramList = append(paramList, 
CreateValueExpression(expressionResolver, value))
+               }
+               valueExpression = paramList
+       case string:
+               value := paramAssignment.(string)
+               if !strings.HasPrefix(value, "$") {
+                       valueExpression = paramAssignment
+               }
+               valueExpression = expressionResolver.Expression(value)
+       default:
+               valueExpression = paramAssignment
+       }
+       return valueExpression
+}
+
+func GetValue(valueExpression any, variablesFrom any, stateInstance 
statelang.StateInstance) any {
+       switch valueExpression.(type) {
+       case expr.Expression:
+               expression := valueExpression.(expr.Expression)
+               value := expression.Value(variablesFrom)
+               if _, ok := valueExpression.(expr.SequenceExpression); value != 
nil && stateInstance != nil && stateInstance.BusinessKey() == "" && ok {
+                       stateInstance.SetBusinessKey(fmt.Sprintf("%v", value))
+               }
+               return value
+       case map[string]any:
+               mapValueExpression := valueExpression.(map[string]any)
+               mapValue := make(map[string]any, len(mapValueExpression))
+               for key, value := range mapValueExpression {
+                       value = GetValue(value, variablesFrom, stateInstance)
+                       if value != nil {
+                               mapValue[key] = value
+                       }
+               }
+               return mapValue
+       case []any:
+               valueExpressionList := valueExpression.([]any)
+               listValue := make([]any, 0, len(valueExpression.([]any)))
+               for i, _ := range valueExpressionList {
+                       listValue = append(listValue, 
GetValue(valueExpressionList[i], variablesFrom, stateInstance))
+               }
+               return listValue
+       default:
+               return valueExpression
+       }
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_context.go 
b/pkg/saga/statemachine/engine/core/process_context.go
similarity index 99%
rename from pkg/saga/statemachine/engine/process_ctrl/process_context.go
rename to pkg/saga/statemachine/engine/core/process_context.go
index 7d02e470..9da5c3e8 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/process_context.go
+++ b/pkg/saga/statemachine/engine/core/process_context.go
@@ -1,4 +1,4 @@
-package process_ctrl
+package core
 
 import (
        "sync"
diff --git a/pkg/saga/statemachine/engine/core/process_controller.go 
b/pkg/saga/statemachine/engine/core/process_controller.go
new file mode 100644
index 00000000..2f45aebd
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/process_controller.go
@@ -0,0 +1,31 @@
+package core
+
+import (
+       "context"
+)
+
+type ProcessController interface {
+       Process(ctx context.Context, context ProcessContext) error
+}
+
+type ProcessControllerImpl struct {
+       businessProcessor BusinessProcessor
+}
+
+func (p *ProcessControllerImpl) Process(ctx context.Context, context 
ProcessContext) error {
+       if err := p.businessProcessor.Process(ctx, context); err != nil {
+               return err
+       }
+       if err := p.businessProcessor.Route(ctx, context); err != nil {
+               return err
+       }
+       return nil
+}
+
+func (p *ProcessControllerImpl) BusinessProcessor() BusinessProcessor {
+       return p.businessProcessor
+}
+
+func (p *ProcessControllerImpl) SetBusinessProcessor(businessProcessor 
BusinessProcessor) {
+       p.businessProcessor = businessProcessor
+}
diff --git 
a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go 
b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
new file mode 100644
index 00000000..426ec65d
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
@@ -0,0 +1,424 @@
+package core
+
+import (
+       "context"
+       "fmt"
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
+       "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       seataErrors "github.com/seata/seata-go/pkg/util/errors"
+       "github.com/seata/seata-go/pkg/util/log"
+       "time"
+)
+
+type ProcessCtrlStateMachineEngine struct {
+       StateMachineConfig StateMachineConfig
+}
+
+func NewProcessCtrlStateMachineEngine() *ProcessCtrlStateMachineEngine {
+       return &ProcessCtrlStateMachineEngine{
+               StateMachineConfig: NewDefaultStateMachineConfig(),
+       }
+}
+
+func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, 
stateMachineName string, tenantId string, startParams map[string]interface{}) 
(statelang.StateMachineInstance, error) {
+       return p.startInternal(ctx, stateMachineName, tenantId, "", 
startParams, false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) Compensate(ctx context.Context, 
stateMachineInstId string,
+       replaceParams map[string]any) (statelang.StateMachineInstance, error) {
+       return p.compensateInternal(ctx, stateMachineInstId, replaceParams, 
false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, 
stateMachineName string, tenantId string,
+       businessKey string, startParams map[string]interface{}, async bool, 
callback CallBack) (statelang.StateMachineInstance, error) {
+       if tenantId == "" {
+               tenantId = p.StateMachineConfig.DefaultTenantId()
+       }
+
+       stateMachineInstance, err := p.createMachineInstance(stateMachineName, 
tenantId, businessKey, startParams)
+       if err != nil {
+               return nil, err
+       }
+
+       // Build the process_ctrl context.
+       processContextBuilder := NewProcessContextBuilder().
+               WithProcessType(process.StateLang).
+               WithOperationName(constant.OperationNameStart).
+               WithAsyncCallback(callback).
+               WithInstruction(NewStateInstruction(stateMachineName, 
tenantId)).
+               WithStateMachineInstance(stateMachineInstance).
+               WithStateMachineConfig(p.StateMachineConfig).
+               WithStateMachineEngine(p).
+               WithIsAsyncExecution(async)
+
+       contextMap := p.copyMap(startParams)
+
+       stateMachineInstance.SetContext(contextMap)
+
+       processContext := 
processContextBuilder.WithStateMachineContextVariables(contextMap).Build()
+
+       if stateMachineInstance.StateMachine().IsPersist() && 
p.StateMachineConfig.StateLogStore() != nil {
+               err := 
p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, 
stateMachineInstance, processContext)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       if stateMachineInstance.ID() == "" {
+               
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst,
 ""))
+       }
+
+       var eventPublisher EventPublisher
+       if async {
+               eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
+       } else {
+               eventPublisher = p.StateMachineConfig.EventPublisher()
+       }
+
+       _, err = eventPublisher.PushEvent(ctx, processContext)
+       if err != nil {
+               return nil, err
+       }
+
+       return stateMachineInstance, nil
+}
+
+// copyMap not deep copy, so best practice: Don’t pass by reference
+func (p ProcessCtrlStateMachineEngine) copyMap(startParams 
map[string]interface{}) map[string]interface{} {
+       copyMap := make(map[string]interface{}, len(startParams))
+       for k, v := range startParams {
+               copyMap[k] = v
+       }
+       return copyMap
+}
+
+func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName 
string, tenantId string, businessKey string, startParams 
map[string]interface{}) (statelang.StateMachineInstance, error) {
+       stateMachine, err := 
p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName,
 tenantId)
+       if err != nil {
+               return nil, err
+       }
+
+       if stateMachine == nil {
+               return nil, errors.New("StateMachine [" + stateMachineName + "] 
is not exists")
+       }
+
+       stateMachineInstance := statelang.NewStateMachineInstanceImpl()
+       stateMachineInstance.SetStateMachine(stateMachine)
+       stateMachineInstance.SetTenantID(tenantId)
+       stateMachineInstance.SetBusinessKey(businessKey)
+       stateMachineInstance.SetStartParams(startParams)
+       if startParams != nil {
+               if businessKey != "" {
+                       startParams[constant.VarNameBusinesskey] = businessKey
+               }
+
+               if startParams[constant.VarNameParentId] != nil {
+                       parentId, ok := 
startParams[constant.VarNameParentId].(string)
+                       if !ok {
+
+                       }
+                       stateMachineInstance.SetParentID(parentId)
+                       delete(startParams, constant.VarNameParentId)
+               }
+       }
+
+       stateMachineInstance.SetStatus(statelang.RU)
+       stateMachineInstance.SetRunning(true)
+
+       now := time.Now()
+       stateMachineInstance.SetStartedTime(now)
+       stateMachineInstance.SetUpdatedTime(now)
+       return stateMachineInstance, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) compensateInternal(ctx context.Context, 
stateMachineInstId string, replaceParams map[string]any,
+       async bool, callback CallBack) (statelang.StateMachineInstance, error) {
+       stateMachineInstance, err := p.reloadStateMachineInstance(ctx, 
stateMachineInstId)
+       if err != nil {
+               return nil, err
+       }
+
+       if stateMachineInstance == nil {
+               return nil, 
exception.NewEngineExecutionException(seataErrors.StateMachineInstanceNotExists,
+                       "StateMachineInstance is not exits", nil)
+       }
+
+       if statelang.SU == stateMachineInstance.CompensationStatus() {
+               return stateMachineInstance, nil
+       }
+
+       if stateMachineInstance.CompensationStatus() != "" {
+               denyStatus := make([]statelang.ExecutionStatus, 0)
+               denyStatus = append(denyStatus, statelang.SU)
+               p.checkStatus(ctx, stateMachineInstance, nil, denyStatus, "", 
stateMachineInstance.CompensationStatus(),
+                       "compensate")
+       }
+
+       if replaceParams != nil {
+               for key, value := range replaceParams {
+                       stateMachineInstance.EndParams()[key] = value
+               }
+       }
+
+       contextBuilder := 
NewProcessContextBuilder().WithProcessType(process.StateLang).
+               
WithOperationName(constant.OperationNameCompensate).WithAsyncCallback(callback).
+               WithStateMachineInstance(stateMachineInstance).
+               
WithStateMachineConfig(p.StateMachineConfig).WithStateMachineEngine(p).WithIsAsyncExecution(async)
+
+       context := contextBuilder.Build()
+
+       contextVariables, err := p.getStateMachineContextVariables(ctx, 
stateMachineInstance)
+
+       if replaceParams != nil {
+               for key, value := range replaceParams {
+                       contextVariables[key] = value
+               }
+       }
+
+       p.putBusinesskeyToContextariables(stateMachineInstance, 
contextVariables)
+
+       // TODO: Here is not use sync.map, make sure whether to use it
+       concurrentContextVariables := make(map[string]any)
+       p.nullSafeCopy(contextVariables, concurrentContextVariables)
+
+       context.SetVariable(constant.VarNameStateMachineContext, 
concurrentContextVariables)
+       stateMachineInstance.SetContext(concurrentContextVariables)
+
+       tempCompensationTriggerState := state.NewCompensationTriggerStateImpl()
+       
tempCompensationTriggerState.SetStateMachine(stateMachineInstance.StateMachine())
+
+       stateMachineInstance.SetRunning(true)
+
+       log.Info("Operation [compensate] start.  stateMachineInstance[id:" + 
stateMachineInstance.ID() + "]")
+
+       if stateMachineInstance.StateMachine().IsPersist() {
+               err := 
p.StateMachineConfig.StateLogStore().RecordStateMachineRestarted(ctx, 
stateMachineInstance, context)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       inst := NewStateInstruction(stateMachineInstance.TenantID(), 
stateMachineInstance.StateMachine().Name())
+       inst.SetTemporaryState(tempCompensationTriggerState)
+       context.SetInstruction(inst)
+
+       if async {
+               _, err := 
p.StateMachineConfig.AsyncEventPublisher().PushEvent(ctx, context)
+               if err != nil {
+                       return nil, err
+               }
+       } else {
+               _, err := p.StateMachineConfig.EventPublisher().PushEvent(ctx, 
context)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       return stateMachineInstance, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) reloadStateMachineInstance(ctx 
context.Context, instId string) (statelang.StateMachineInstance, error) {
+       instance, err := 
p.StateMachineConfig.StateLogStore().GetStateMachineInstance(instId)
+       if err != nil {
+               return nil, err
+       }
+       if instance != nil {
+               stateMachine := instance.StateMachine()
+               if stateMachine == nil {
+                       stateMachine, err = 
p.StateMachineConfig.StateMachineRepository().GetStateMachineById(instance.MachineID())
+                       if err != nil {
+                               return nil, err
+                       }
+                       instance.SetStateMachine(stateMachine)
+               }
+               if stateMachine == nil {
+                       return nil, 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                               "StateMachine[id:"+instance.MachineID()+"] not 
exist.", nil)
+               }
+
+               stateList := instance.StateList()
+               if stateList == nil || len(stateList) == 0 {
+                       stateList, err = 
p.StateMachineConfig.StateLogStore().GetStateInstanceListByMachineInstanceId(instId)
+                       if err != nil {
+                               return nil, err
+                       }
+                       if stateList != nil && len(stateList) > 0 {
+                               for _, tmpStateInstance := range stateList {
+                                       
instance.PutState(tmpStateInstance.ID(), tmpStateInstance)
+                               }
+                       }
+               }
+
+               if instance.EndParams() == nil || len(instance.EndParams()) == 
0 {
+                       variables, err := p.replayContextVariables(ctx, 
instance)
+                       if err != nil {
+                               return nil, err
+                       }
+                       instance.SetEndParams(variables)
+               }
+       }
+       return instance, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) replayContextVariables(ctx 
context.Context, stateMachineInstance statelang.StateMachineInstance) 
(map[string]any, error) {
+       contextVariables := make(map[string]any)
+       if stateMachineInstance.StartParams() != nil {
+               for key, value := range stateMachineInstance.StartParams() {
+                       contextVariables[key] = value
+               }
+       }
+
+       stateInstanceList := stateMachineInstance.StateList()
+       if stateInstanceList == nil || len(stateInstanceList) == 0 {
+               return contextVariables, nil
+       }
+
+       for _, stateInstance := range stateInstanceList {
+               serviceOutputParams := stateInstance.OutputParams()
+               if serviceOutputParams != nil {
+                       serviceTaskStateImpl, ok := 
stateMachineInstance.StateMachine().State(GetOriginStateName(stateInstance)).(*state.ServiceTaskStateImpl)
+                       if !ok {
+                               return nil, 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                                       "Cannot find State by state name 
["+stateInstance.Name()+"], may be this is a bug", nil)
+                       }
+
+                       if serviceTaskStateImpl.Output() != nil && 
len(serviceTaskStateImpl.Output()) != 0 {
+                               outputVariablesToContext, err := 
CreateOutputParams(p.StateMachineConfig,
+                                       
p.StateMachineConfig.ExpressionResolver(), 
serviceTaskStateImpl.AbstractTaskState, serviceOutputParams)
+                               if err != nil {
+                                       return nil, 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                                               "Context variable replay 
failed", err)
+                               }
+                               if outputVariablesToContext != nil && 
len(outputVariablesToContext) != 0 {
+                                       for key, value := range 
outputVariablesToContext {
+                                               contextVariables[key] = value
+                                       }
+                               }
+                               if len(stateInstance.BusinessKey()) > 0 {
+                                       
contextVariables[serviceTaskStateImpl.Name()+constant.VarNameBusinesskey] = 
stateInstance.BusinessKey()
+                               }
+                       }
+               }
+       }
+
+       return contextVariables, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) checkStatus(ctx context.Context, 
stateMachineInstance statelang.StateMachineInstance,
+       acceptStatus []statelang.ExecutionStatus, denyStatus 
[]statelang.ExecutionStatus, status statelang.ExecutionStatus,
+       compenStatus statelang.ExecutionStatus, operation string) (bool, error) 
{
+       if status != "" && compenStatus != "" {
+               return false, 
exception.NewEngineExecutionException(seataErrors.InvalidParameter,
+                       "status and compensationStatus are not supported at the 
same time", nil)
+       }
+       if status == "" && compenStatus == "" {
+               return false, 
exception.NewEngineExecutionException(seataErrors.InvalidParameter,
+                       "status and compensationStatus must input at least 
one", nil)
+       }
+       if statelang.SU == compenStatus {
+               message := p.buildExceptionMessage(stateMachineInstance, nil, 
nil, "", statelang.SU, operation)
+               return false, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+                       message, nil)
+       }
+
+       if stateMachineInstance.IsRunning() &&
+               !IsTimeout(stateMachineInstance.UpdatedTime(), 
p.StateMachineConfig.TransOperationTimeout()) {
+               return false, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+                       "StateMachineInstance 
[id:"+stateMachineInstance.ID()+"] is running, operation["+operation+
+                               "] denied", nil)
+       }
+
+       if (denyStatus == nil || len(denyStatus) == 0) && (acceptStatus == nil 
|| len(acceptStatus) == 0) {
+               return false, 
exception.NewEngineExecutionException(seataErrors.InvalidParameter,
+                       "StateMachineInstance[id:"+stateMachineInstance.ID()+
+                               "], acceptable status and deny status must 
input at least one", nil)
+       }
+
+       currentStatus := compenStatus
+       if status != "" {
+               currentStatus = status
+       }
+
+       if denyStatus != nil && len(denyStatus) == 0 {
+               for _, tempDenyStatus := range denyStatus {
+                       if tempDenyStatus == currentStatus {
+                               message := 
p.buildExceptionMessage(stateMachineInstance, acceptStatus, denyStatus, status,
+                                       compenStatus, operation)
+                               return false, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+                                       message, nil)
+                       }
+               }
+       }
+
+       if acceptStatus == nil || len(acceptStatus) == 0 {
+               return true, nil
+       } else {
+               for _, tempStatus := range acceptStatus {
+                       if tempStatus == currentStatus {
+                               return true, nil
+                       }
+               }
+       }
+
+       message := p.buildExceptionMessage(stateMachineInstance, acceptStatus, 
denyStatus, status, compenStatus,
+               operation)
+       return false, 
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+               message, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) getStateMachineContextVariables(ctx 
context.Context,
+       stateMachineInstance statelang.StateMachineInstance) (map[string]any, 
error) {
+       contextVariables := stateMachineInstance.EndParams()
+       if contextVariables == nil || len(contextVariables) == 0 {
+               return p.replayContextVariables(ctx, stateMachineInstance)
+       }
+       return contextVariables, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) buildExceptionMessage(instance 
statelang.StateMachineInstance,
+       acceptStatus []statelang.ExecutionStatus, denyStatus 
[]statelang.ExecutionStatus, status statelang.ExecutionStatus,
+       compenStatus statelang.ExecutionStatus, operation string) string {
+       message := fmt.Sprintf("StateMachineInstance[id:%s]", instance.ID())
+       if len(acceptStatus) > 0 {
+               message += ",acceptable status :"
+               for _, tempStatus := range acceptStatus {
+                       message += string(tempStatus) + " "
+               }
+       }
+
+       if len(denyStatus) > 0 {
+               message += ",deny status:"
+               for _, tempStatus := range denyStatus {
+                       message += string(tempStatus) + " "
+               }
+       }
+
+       if status != "" {
+               message += ",current status:" + string(status)
+       }
+
+       if compenStatus != "" {
+               message += ",current compensation status:" + 
string(compenStatus)
+       }
+
+       message += fmt.Sprintf(",so operation [%s] denied", operation)
+       return message
+}
+
+func (p ProcessCtrlStateMachineEngine) 
putBusinesskeyToContextariables(instance statelang.StateMachineInstance, 
variables map[string]any) {
+       if instance.BusinessKey() != "" && 
variables[constant.VarNameBusinesskey] == "" {
+               variables[constant.VarNameBusinesskey] = instance.BusinessKey()
+       }
+}
+
+func (p ProcessCtrlStateMachineEngine) nullSafeCopy(srcMap map[string]any, 
destMap map[string]any) {
+       for key, value := range srcMap {
+               if value == nil {
+                       destMap[key] = value
+               }
+       }
+}
diff --git a/pkg/saga/statemachine/engine/core/process_router.go 
b/pkg/saga/statemachine/engine/core/process_router.go
new file mode 100644
index 00000000..5d7e5998
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/process_router.go
@@ -0,0 +1,194 @@
+package core
+
+import (
+       "context"
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/util/log"
+)
+
+type RouterHandler interface {
+       Route(ctx context.Context, processContext ProcessContext) error
+}
+
+type ProcessRouter interface {
+       Route(ctx context.Context, processContext ProcessContext) error
+}
+
+type InterceptAbleStateRouter interface {
+       StateRouter
+       StateRouterInterceptor() []StateRouterInterceptor
+       RegistryStateRouterInterceptor(stateRouterInterceptor 
StateRouterInterceptor)
+}
+
+type StateRouter interface {
+       Route(ctx context.Context, processContext ProcessContext, state 
statelang.State) (Instruction, error)
+}
+
+type StateRouterInterceptor interface {
+       PreRoute(ctx context.Context, processContext ProcessContext, state 
statelang.State) error
+       PostRoute(ctx context.Context, processContext ProcessContext, 
instruction Instruction, err error) error
+       Match(stateType string) bool
+}
+
+type DefaultRouterHandler struct {
+       eventPublisher EventPublisher
+       processRouters map[string]ProcessRouter
+}
+
+func (d *DefaultRouterHandler) Route(ctx context.Context, processContext 
ProcessContext) error {
+       processType := d.matchProcessType(ctx, processContext)
+       if processType == "" {
+               log.Warnf("Process type not found, context= %s", processContext)
+               return errors.New("Process type not found")
+       }
+
+       processRouter := d.processRouters[string(processType)]
+       if processRouter == nil {
+               log.Errorf("Cannot find process router by type %s, context = 
%s", processType, processContext)
+               return errors.New("Process router not found")
+       }
+
+       instruction := processRouter.Route(ctx, processContext)
+       if instruction == nil {
+               log.Info("route instruction is null, process end")
+       } else {
+               processContext.SetInstruction(instruction)
+               _, err := d.eventPublisher.PushEvent(ctx, processContext)
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func (d *DefaultRouterHandler) matchProcessType(ctx context.Context, 
processContext ProcessContext) process.ProcessType {
+       processType, ok := 
processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType)
+       if !ok || processType == "" {
+               processType = process.StateLang
+       }
+       return processType
+}
+
+func (d *DefaultRouterHandler) EventPublisher() EventPublisher {
+       return d.eventPublisher
+}
+
+func (d *DefaultRouterHandler) SetEventPublisher(eventPublisher 
EventPublisher) {
+       d.eventPublisher = eventPublisher
+}
+
+func (d *DefaultRouterHandler) ProcessRouters() map[string]ProcessRouter {
+       return d.processRouters
+}
+
+func (d *DefaultRouterHandler) SetProcessRouters(processRouters 
map[string]ProcessRouter) {
+       d.processRouters = processRouters
+}
+
+type StateMachineProcessRouter struct {
+       stateRouters map[string]StateRouter
+}
+
+func (s *StateMachineProcessRouter) Route(ctx context.Context, processContext 
ProcessContext) (Instruction, error) {
+       stateInstruction, ok := 
processContext.GetInstruction().(StateInstruction)
+       if !ok {
+               return nil, errors.New("instruction is not a state instruction")
+       }
+
+       var state statelang.State
+       if stateInstruction.TemporaryState() != nil {
+               state = stateInstruction.TemporaryState()
+               stateInstruction.SetTemporaryState(nil)
+       } else {
+               stateMachineConfig, ok := 
processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+               if !ok {
+                       return nil, errors.New("state machine config not found")
+               }
+
+               stateMachine, err := 
stateMachineConfig.StateMachineRepository().GetStateMachineByNameAndTenantId(stateInstruction.StateMachineName(),
+                       stateInstruction.TenantId())
+               if err != nil {
+                       return nil, err
+               }
+
+               state = stateMachine.States()[stateInstruction.StateName()]
+       }
+
+       stateType := state.Type()
+       router := s.stateRouters[stateType]
+
+       var interceptors []StateRouterInterceptor
+       if interceptAbleStateRouter, ok := router.(InterceptAbleStateRouter); 
ok {
+               interceptors = interceptAbleStateRouter.StateRouterInterceptor()
+       }
+
+       var executedInterceptors []StateRouterInterceptor
+       var exception error
+       instruction, exception := func() (Instruction, error) {
+               if interceptors == nil || len(executedInterceptors) == 0 {
+                       executedInterceptors = make([]StateRouterInterceptor, 
0, len(interceptors))
+                       for _, interceptor := range interceptors {
+                               executedInterceptors = 
append(executedInterceptors, interceptor)
+                               err := interceptor.PreRoute(ctx, 
processContext, state)
+                               if err != nil {
+                                       return nil, err
+                               }
+                       }
+               }
+
+               instruction, err := router.Route(ctx, processContext, state)
+               if err != nil {
+                       return nil, err
+               }
+               return instruction, nil
+       }()
+
+       if interceptors == nil || len(executedInterceptors) == 0 {
+               for i := len(executedInterceptors) - 1; i >= 0; i-- {
+                       err := executedInterceptors[i].PostRoute(ctx, 
processContext, instruction, exception)
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+
+               // if 'Succeed' or 'Fail' State did not configured, we must end 
the state machine
+               if instruction == nil && !stateInstruction.End() {
+                       err := EndStateMachine(ctx, processContext)
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       }
+
+       return instruction, nil
+}
+
+func (s *StateMachineProcessRouter) InitDefaultStateRouters() {
+       if s.stateRouters == nil || len(s.stateRouters) == 0 {
+               s.stateRouters = make(map[string]StateRouter)
+               taskStateRouter := &TaskStateRouter{}
+               s.stateRouters[constant.StateTypeServiceTask] = taskStateRouter
+               s.stateRouters[constant.StateTypeScriptTask] = taskStateRouter
+               s.stateRouters[constant.StateTypeChoice] = taskStateRouter
+               s.stateRouters[constant.StateTypeCompensationTrigger] = 
taskStateRouter
+               s.stateRouters[constant.StateTypeSubStateMachine] = 
taskStateRouter
+               s.stateRouters[constant.StateTypeCompensateSubMachine] = 
taskStateRouter
+               s.stateRouters[constant.StateTypeLoopStart] = taskStateRouter
+
+               endStateRouter := &EndStateRouter{}
+               s.stateRouters[constant.StateTypeSucceed] = endStateRouter
+               s.stateRouters[constant.StateTypeFail] = endStateRouter
+       }
+}
+
+func (s *StateMachineProcessRouter) StateRouters() map[string]StateRouter {
+       return s.stateRouters
+}
+
+func (s *StateMachineProcessRouter) SetStateRouters(stateRouters 
map[string]StateRouter) {
+       s.stateRouters = stateRouters
+}
diff --git 
a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go 
b/pkg/saga/statemachine/engine/core/process_state.go
similarity index 69%
rename from pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
rename to pkg/saga/statemachine/engine/core/process_state.go
index 66338fd8..3dc06920 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
+++ b/pkg/saga/statemachine/engine/core/process_state.go
@@ -1,8 +1,8 @@
-package process_ctrl
+package core
 
 import (
        "context"
-       "github.com/pkg/errors"
+       "errors"
        "sync"
 )
 
@@ -11,20 +11,20 @@ type StateHandler interface {
        ProcessHandler
 }
 
-type StateRouter interface {
-       State() string
-       RouterHandler
-}
-
 type InterceptAbleStateHandler interface {
        StateHandler
        StateHandlerInterceptorList() []StateHandlerInterceptor
        RegistryStateHandlerInterceptor(stateHandlerInterceptor 
StateHandlerInterceptor)
 }
 
+type ProcessHandler interface {
+       Process(ctx context.Context, processContext ProcessContext) error
+}
+
 type StateHandlerInterceptor interface {
        PreProcess(ctx context.Context, processContext ProcessContext) error
        PostProcess(ctx context.Context, processContext ProcessContext) error
+       Match(stateType string) bool
 }
 
 type StateMachineProcessHandler struct {
@@ -99,40 +99,3 @@ func (s *StateMachineProcessHandler) 
RegistryStateHandler(stateType string, stat
        }
        s.mp[stateType] = stateHandler
 }
-
-type StateMachineRouterHandler struct {
-       mu sync.RWMutex
-       mp map[string]StateRouter
-}
-
-func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext 
ProcessContext) error {
-       stateInstruction, _ := 
processContext.GetInstruction().(StateInstruction)
-
-       state, err := stateInstruction.GetState(processContext)
-       if err != nil {
-               return err
-       }
-
-       stateType := state.Type()
-       stateRouter := s.GetStateRouter(stateType)
-       if stateRouter == nil {
-               return errors.New("Not support [" + stateType + "] state 
router")
-       }
-
-       return stateRouter.Route(ctx, processContext)
-}
-
-func (s *StateMachineRouterHandler) GetStateRouter(stateType string) 
StateRouter {
-       s.mu.RLock()
-       defer s.mu.RUnlock()
-       return s.mp[stateType]
-}
-
-func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string, 
stateRouter StateRouter) {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-       if s.mp == nil {
-               s.mp = make(map[string]StateRouter)
-       }
-       s.mp[stateType] = stateRouter
-}
diff --git a/pkg/saga/statemachine/engine/core/state_router.go 
b/pkg/saga/statemachine/engine/core/state_router.go
new file mode 100644
index 00000000..185984e4
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/state_router.go
@@ -0,0 +1,151 @@
+package core
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       sagaState 
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       seataErrors "github.com/seata/seata-go/pkg/util/errors"
+       "github.com/seata/seata-go/pkg/util/log"
+)
+
+type EndStateRouter struct {
+}
+
+func (e EndStateRouter) Route(ctx context.Context, processContext 
ProcessContext, state statelang.State) (Instruction, error) {
+       return nil, nil
+}
+
+type TaskStateRouter struct {
+}
+
+func (t TaskStateRouter) Route(ctx context.Context, processContext 
ProcessContext, state statelang.State) (Instruction, error) {
+       stateInstruction, _ := 
processContext.GetInstruction().(StateInstruction)
+       if stateInstruction.End() {
+               log.Infof("StateInstruction is ended, Stop the StateMachine 
executing. StateMachine[%s] Current State[%s]",
+                       stateInstruction.StateMachineName(), 
stateInstruction.StateName())
+       }
+
+       // check if in loop async condition
+       isLoop, ok := 
processContext.GetVariable(constant.VarNameIsLoopState).(bool)
+       if ok && isLoop {
+               log.Infof("StateMachine[%s] Current State[%s] is in loop async 
condition, skip route processing.",
+                       stateInstruction.StateMachineName(), 
stateInstruction.StateName())
+               return nil, nil
+       }
+
+       // The current CompensationTriggerState can mark the compensation 
process is started and perform compensation
+       // route processing.
+       compensationTriggerState, ok := 
processContext.GetVariable(constant.VarNameCurrentCompensateTriggerState).(statelang.State)
+       if ok {
+               return t.compensateRoute(ctx, processContext, 
compensationTriggerState)
+       }
+
+       // There is an exception route, indicating that an exception is thrown, 
and the exception route is prioritized.
+       next := 
processContext.GetVariable(constant.VarNameCurrentExceptionRoute).(string)
+
+       if next != "" {
+               
processContext.RemoveVariable(constant.VarNameCurrentExceptionRoute)
+       } else {
+               next = state.Next()
+       }
+
+       // If next is empty, the state selected by the Choice state was taken.
+       if next == "" && 
processContext.HasVariable(constant.VarNameCurrentChoice) {
+               next = 
processContext.GetVariable(constant.VarNameCurrentChoice).(string)
+               processContext.RemoveVariable(constant.VarNameCurrentChoice)
+       }
+
+       if next == "" {
+               return nil, nil
+       }
+
+       stateMachine := state.StateMachine()
+       nextState := stateMachine.State(next)
+       if nextState == nil {
+               return nil, 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                       "Next state["+next+"] is not exits", nil)
+       }
+
+       stateInstruction.SetStateName(next)
+
+       if nil != GetLoopConfig(ctx, processContext, nextState) {
+               
stateInstruction.SetTemporaryState(sagaState.NewLoopStartStateImpl())
+       }
+
+       return stateInstruction, nil
+}
+
+func (t *TaskStateRouter) compensateRoute(ctx context.Context, processContext 
ProcessContext,
+       compensationTriggerState statelang.State) (Instruction, error) {
+       //If there is already a compensation state that has been executed,
+       // it is judged whether it is wrong or unsuccessful,
+       // and the compensation process is interrupted.
+       isFirstCompensationStateStart := 
processContext.GetVariable(constant.VarNameFirstCompensationStateStarted).(bool)
+       if isFirstCompensationStateStart {
+               exception := 
processContext.GetVariable(constant.VarNameCurrentException).(error)
+               if exception != nil {
+                       return nil, EndStateMachine(ctx, processContext)
+               }
+
+               stateInstance := 
processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)
+               if stateInstance != nil && statelang.SU != 
stateInstance.Status() {
+                       return nil, EndStateMachine(ctx, processContext)
+               }
+       }
+
+       stateStackToBeCompensated := GetCurrentCompensationHolder(ctx, 
processContext, true).StateStackNeedCompensation()
+       if stateStackToBeCompensated != nil {
+               stateToBeCompensated := 
stateStackToBeCompensated.Pop().(statelang.StateInstance)
+
+               stateMachine := 
processContext.GetVariable(constant.VarNameStateMachine).(statelang.StateMachine)
+               state := 
stateMachine.State(GetOriginStateName(stateToBeCompensated))
+               if taskState, ok := state.(sagaState.AbstractTaskState); ok {
+                       instruction := 
processContext.GetInstruction().(StateInstruction)
+
+                       var compensateState statelang.State
+                       compensateStateName := taskState.CompensateState()
+                       if len(compensateStateName) != 0 {
+                               compensateState = 
stateMachine.State(compensateStateName)
+                       }
+
+                       if subStateMachine, ok := 
state.(sagaState.SubStateMachine); compensateState == nil && ok {
+                               compensateState = 
subStateMachine.CompensateStateImpl()
+                               instruction.SetTemporaryState(compensateState)
+                       }
+
+                       if compensateState == nil {
+                               return nil, EndStateMachine(ctx, processContext)
+                       }
+
+                       instruction.SetStateName(compensateState.Name())
+
+                       GetCurrentCompensationHolder(ctx, processContext, 
true).AddToBeCompensatedState(compensateState.Name(),
+                               stateToBeCompensated)
+
+                       hierarchicalProcessContext := 
processContext.(HierarchicalProcessContext)
+                       
hierarchicalProcessContext.SetVariableLocally(constant.VarNameFirstCompensationStateStarted,
 true)
+
+                       if _, ok := 
compensateState.(sagaState.CompensateSubStateMachineState); ok {
+                               hierarchicalProcessContext = 
processContext.(HierarchicalProcessContext)
+                               hierarchicalProcessContext.SetVariableLocally(
+                                       
compensateState.Name()+constant.VarNameSubMachineParentId,
+                                       GenerateParentId(stateToBeCompensated))
+                       }
+
+                       return instruction, nil
+               }
+       }
+
+       
processContext.RemoveVariable(constant.VarNameCurrentCompensateTriggerState)
+
+       compensationTriggerStateNext := compensationTriggerState.Next()
+       if compensationTriggerStateNext == "" {
+               return nil, EndStateMachine(ctx, processContext)
+       }
+
+       instruction := processContext.GetInstruction().(StateInstruction)
+       instruction.SetStateName(compensationTriggerStateNext)
+       return instruction, nil
+}
diff --git a/pkg/saga/statemachine/engine/statemachine_config.go 
b/pkg/saga/statemachine/engine/core/statemachine_config.go
similarity index 52%
rename from pkg/saga/statemachine/engine/statemachine_config.go
rename to pkg/saga/statemachine/engine/core/statemachine_config.go
index 3f3c530b..9c77b73a 100644
--- a/pkg/saga/statemachine/engine/statemachine_config.go
+++ b/pkg/saga/statemachine/engine/core/statemachine_config.go
@@ -1,22 +1,20 @@
-package engine
+package core
 
 import (
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
+       "sync"
 )
 
 type StateMachineConfig interface {
-       StateLogRepository() store.StateLogRepository
+       StateLogRepository() StateLogRepository
 
-       StateMachineRepository() store.StateMachineRepository
+       StateMachineRepository() StateMachineRepository
 
-       StateLogStore() store.StateLogStore
+       StateLogStore() StateLogStore
 
-       StateLangStore() store.StateLangStore
+       StateLangStore() StateLangStore
 
        ExpressionFactoryManager() expr.ExpressionFactoryManager
 
@@ -24,11 +22,11 @@ type StateMachineConfig interface {
 
        SeqGenerator() sequence.SeqGenerator
 
-       StatusDecisionStrategy() status_decision.StatusDecisionStrategy
+       StatusDecisionStrategy() StatusDecisionStrategy
 
-       EventPublisher() events.EventPublisher
+       EventPublisher() EventPublisher
 
-       AsyncEventPublisher() events.EventPublisher
+       AsyncEventPublisher() EventPublisher
 
        ServiceInvokerManager() invoker.ServiceInvokerManager
 
@@ -41,4 +39,6 @@ type StateMachineConfig interface {
        TransOperationTimeout() int
 
        ServiceInvokeTimeout() int
+
+       ComponentLock() *sync.Mutex
 }
diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine.go 
b/pkg/saga/statemachine/engine/core/statemachine_engine.go
new file mode 100644
index 00000000..c18d91c1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/statemachine_engine.go
@@ -0,0 +1,16 @@
+package core
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type StateMachineEngine interface {
+       Start(ctx context.Context, stateMachineName string, tenantId string, 
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
+       Compensate(ctx context.Context, stateMachineInstId string, 
replaceParams map[string]any) (statelang.StateMachineInstance, error)
+}
+
+type CallBack interface {
+       OnFinished(ctx context.Context, context ProcessContext, 
stateMachineInstance statelang.StateMachineInstance)
+       OnError(ctx context.Context, context ProcessContext, 
stateMachineInstance statelang.StateMachineInstance, err error)
+}
diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine_test.go 
b/pkg/saga/statemachine/engine/core/statemachine_engine_test.go
new file mode 100644
index 00000000..e87eb9ca
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/statemachine_engine_test.go
@@ -0,0 +1,15 @@
+package core
+
+import (
+       "context"
+       "testing"
+)
+
+func TestEngine(t *testing.T) {
+
+}
+
+func TestSimpleStateMachine(t *testing.T) {
+       engine := NewProcessCtrlStateMachineEngine()
+       engine.Start(context.Background(), "simpleStateMachine", "tenantId", 
nil)
+}
diff --git a/pkg/saga/statemachine/engine/store/statemachine_store.go 
b/pkg/saga/statemachine/engine/core/statemachine_store.go
similarity index 83%
rename from pkg/saga/statemachine/engine/store/statemachine_store.go
rename to pkg/saga/statemachine/engine/core/statemachine_store.go
index 2dac06d3..7a763335 100644
--- a/pkg/saga/statemachine/engine/store/statemachine_store.go
+++ b/pkg/saga/statemachine/engine/core/statemachine_store.go
@@ -1,8 +1,7 @@
-package store
+package core
 
 import (
        "context"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
        "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
        "io"
 )
@@ -20,15 +19,15 @@ type StateLogRepository interface {
 }
 
 type StateLogStore interface {
-       RecordStateMachineStarted(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+       RecordStateMachineStarted(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context ProcessContext) error
 
-       RecordStateMachineFinished(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+       RecordStateMachineFinished(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context ProcessContext) error
 
-       RecordStateMachineRestarted(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+       RecordStateMachineRestarted(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context ProcessContext) error
 
-       RecordStateStarted(ctx context.Context, stateInstance 
statelang.StateInstance, context process_ctrl.ProcessContext) error
+       RecordStateStarted(ctx context.Context, stateInstance 
statelang.StateInstance, context ProcessContext) error
 
-       RecordStateFinished(ctx context.Context, stateInstance 
statelang.StateInstance, context process_ctrl.ProcessContext) error
+       RecordStateFinished(ctx context.Context, stateInstance 
statelang.StateInstance, context ProcessContext) error
 
        GetStateMachineInstance(stateMachineInstanceId string) 
(statelang.StateMachineInstance, error)
 
@@ -44,6 +43,8 @@ type StateLogStore interface {
 type StateMachineRepository interface {
        GetStateMachineById(stateMachineId string) (statelang.StateMachine, 
error)
 
+       GetStateMachineByNameAndTenantId(stateMachineName string, tenantId 
string) (statelang.StateMachine, error)
+
        GetLastVersionStateMachine(stateMachineName string, tenantId string) 
(statelang.StateMachine, error)
 
        RegistryStateMachine(statelang.StateMachine) error
diff --git a/pkg/saga/statemachine/engine/core/status_decision.go 
b/pkg/saga/statemachine/engine/core/status_decision.go
new file mode 100644
index 00000000..e95a6316
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/status_decision.go
@@ -0,0 +1,188 @@
+package core
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/util/log"
+)
+
+type StatusDecisionStrategy interface {
+       // DecideOnEndState Determine state machine execution status when 
executing to EndState
+       DecideOnEndState(ctx context.Context, processContext ProcessContext,
+               stateMachineInstance statelang.StateMachineInstance, exp error) 
error
+       // DecideOnTaskStateFail Determine state machine execution status when 
executing TaskState error
+       DecideOnTaskStateFail(ctx context.Context, processContext 
ProcessContext,
+               stateMachineInstance statelang.StateMachineInstance, exp error) 
error
+       // DecideMachineForwardExecutionStatus Determine the forward execution 
state of the state machine
+       DecideMachineForwardExecutionStatus(ctx context.Context,
+               stateMachineInstance statelang.StateMachineInstance, exp error, 
specialPolicy bool) error
+}
+
+type DefaultStatusDecisionStrategy struct {
+}
+
+func NewDefaultStatusDecisionStrategy() *DefaultStatusDecisionStrategy {
+       return &DefaultStatusDecisionStrategy{}
+}
+
+func (d DefaultStatusDecisionStrategy) DecideOnEndState(ctx context.Context, 
processContext ProcessContext,
+       stateMachineInstance statelang.StateMachineInstance, exp error) error {
+       if statelang.RU == stateMachineInstance.CompensationStatus() {
+               compensationHolder := GetCurrentCompensationHolder(ctx, 
processContext, true)
+               if err := decideMachineCompensateStatus(ctx, 
stateMachineInstance, compensationHolder); err != nil {
+                       return err
+               }
+       } else {
+               failEndStateFlag, ok := 
processContext.GetVariable(constant.VarNameFailEndStateFlag).(bool)
+               if !ok {
+                       failEndStateFlag = false
+               }
+               if _, err := decideMachineForwardExecutionStatus(ctx, 
stateMachineInstance, exp, failEndStateFlag); err != nil {
+                       return err
+               }
+       }
+
+       if stateMachineInstance.CompensationStatus() != "" && 
constant.OperationNameForward ==
+               
processContext.GetVariable(constant.VarNameOperationName).(string) && 
statelang.SU == stateMachineInstance.Status() {
+               stateMachineInstance.SetCompensationStatus(statelang.FA)
+       }
+
+       log.Debugf("StateMachine Instance[id:%s,name:%s] execute finish with 
status[%s], compensation status [%s].",
+               stateMachineInstance.ID(), 
stateMachineInstance.StateMachine().Name(),
+               stateMachineInstance.Status(), 
stateMachineInstance.CompensationStatus())
+
+       return nil
+}
+
+func decideMachineCompensateStatus(ctx context.Context, stateMachineInstance 
statelang.StateMachineInstance, compensationHolder *CompensationHolder) error {
+       if stateMachineInstance.Status() == "" || statelang.RU == 
stateMachineInstance.Status() {
+               stateMachineInstance.SetStatus(statelang.UN)
+       }
+       if !compensationHolder.StateStackNeedCompensation().Empty() {
+               hasCompensateSUorUN := false
+               compensationHolder.StatesForCompensation().Range(
+                       func(key, value any) bool {
+                               stateInstance, ok := 
value.(statelang.StateInstance)
+                               if !ok {
+                                       return false
+                               }
+                               if statelang.UN == stateInstance.Status() || 
statelang.SU == stateInstance.Status() {
+                                       hasCompensateSUorUN = true
+                                       return true
+                               }
+                               return false
+                       })
+
+               if hasCompensateSUorUN {
+                       stateMachineInstance.SetCompensationStatus(statelang.UN)
+               } else {
+                       stateMachineInstance.SetCompensationStatus(statelang.FA)
+               }
+       } else {
+               hasCompensateError := false
+               compensationHolder.StatesForCompensation().Range(
+                       func(key, value any) bool {
+                               stateInstance, ok := 
value.(statelang.StateInstance)
+                               if !ok {
+                                       return false
+                               }
+                               if statelang.SU != stateInstance.Status() {
+                                       hasCompensateError = true
+                                       return true
+                               }
+                               return false
+                       })
+
+               if hasCompensateError {
+                       stateMachineInstance.SetCompensationStatus(statelang.UN)
+               } else {
+                       stateMachineInstance.SetCompensationStatus(statelang.SU)
+               }
+       }
+       return nil
+}
+
+func decideMachineForwardExecutionStatus(ctx context.Context, 
stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy 
bool) (bool, error) {
+       result := false
+
+       if stateMachineInstance.Status() == "" || statelang.RU == 
stateMachineInstance.Status() {
+               result = true
+               stateList := stateMachineInstance.StateList()
+               
setMachineStatusBasedOnStateListAndException(stateMachineInstance, stateList, 
exp)
+
+               if specialPolicy && statelang.SU == 
stateMachineInstance.Status() {
+                       for _, stateInstance := range 
stateMachineInstance.StateList() {
+                               if !stateInstance.IsIgnoreStatus() && 
(stateInstance.IsForUpdate() || stateInstance.IsForCompensation()) {
+                                       
stateMachineInstance.SetStatus(statelang.UN)
+                                       break
+                               }
+                       }
+                       if statelang.SU == stateMachineInstance.Status() {
+                               stateMachineInstance.SetStatus(statelang.FA)
+                       }
+               }
+       }
+       return result, nil
+}
+
+func setMachineStatusBasedOnStateListAndException(stateMachineInstance 
statelang.StateMachineInstance,
+       stateList []statelang.StateInstance, exp error) {
+       hasSetStatus := false
+       hasSuccessUpdateService := false
+       if stateList != nil && len(stateList) > 0 {
+               hasUnsuccessService := false
+
+               for i := len(stateList) - 1; i >= 0; i-- {
+                       stateInstance := stateList[i]
+
+                       if stateInstance.IsIgnoreStatus() || 
stateInstance.IsForCompensation() {
+                               continue
+                       }
+                       if statelang.UN == stateInstance.Status() {
+                               stateMachineInstance.SetStatus(statelang.UN)
+                               hasSetStatus = true
+                       } else if statelang.SU == stateInstance.Status() {
+                               if constant.StateTypeServiceTask == 
stateInstance.Type() {
+                                       if stateInstance.IsForUpdate() && 
!stateInstance.IsForCompensation() {
+                                               hasSuccessUpdateService = true
+                                       }
+                               }
+                       } else if statelang.SK == stateInstance.Status() {
+                               // ignore
+                       } else {
+                               hasUnsuccessService = true
+                       }
+               }
+
+               if !hasSetStatus && hasUnsuccessService {
+                       if hasSuccessUpdateService {
+                               stateMachineInstance.SetStatus(statelang.UN)
+                       } else {
+                               stateMachineInstance.SetStatus(statelang.FA)
+                       }
+                       hasSetStatus = true
+               }
+       }
+
+       if !hasSetStatus {
+               setMachineStatusBasedOnException(stateMachineInstance, exp, 
hasSuccessUpdateService)
+       }
+}
+
+func setMachineStatusBasedOnException(stateMachineInstance 
statelang.StateMachineInstance, exp error, hasSuccessUpdateService bool) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (d DefaultStatusDecisionStrategy) DecideOnTaskStateFail(ctx 
context.Context, processContext ProcessContext,
+       stateMachineInstance statelang.StateMachineInstance, exp error) error {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (d DefaultStatusDecisionStrategy) DecideMachineForwardExecutionStatus(ctx 
context.Context,
+       stateMachineInstance statelang.StateMachineInstance, exp error, 
specialPolicy bool) error {
+       //TODO implement me
+       panic("implement me")
+}
diff --git a/pkg/saga/statemachine/engine/utils.go 
b/pkg/saga/statemachine/engine/core/utils.go
similarity index 84%
rename from pkg/saga/statemachine/engine/utils.go
rename to pkg/saga/statemachine/engine/core/utils.go
index 92b7aa10..8300c0eb 100644
--- a/pkg/saga/statemachine/engine/utils.go
+++ b/pkg/saga/statemachine/engine/core/utils.go
@@ -1,22 +1,22 @@
-package engine
+package core
 
 import (
        "github.com/seata/seata-go/pkg/saga/statemachine/constant"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
        "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
 )
 
 // ProcessContextBuilder process_ctrl builder
 type ProcessContextBuilder struct {
-       processContext process_ctrl.ProcessContext
+       processContext ProcessContext
 }
 
 func NewProcessContextBuilder() *ProcessContextBuilder {
-       processContextImpl := process_ctrl.NewProcessContextImpl()
+       processContextImpl := NewProcessContextImpl()
        return &ProcessContextBuilder{processContextImpl}
 }
 
-func (p *ProcessContextBuilder) WithProcessType(processType 
process_ctrl.ProcessType) *ProcessContextBuilder {
+func (p *ProcessContextBuilder) WithProcessType(processType 
process.ProcessType) *ProcessContextBuilder {
        p.processContext.SetVariable(constant.VarNameProcessType, processType)
        return p
 }
@@ -34,7 +34,7 @@ func (p *ProcessContextBuilder) WithAsyncCallback(callBack 
CallBack) *ProcessCon
        return p
 }
 
-func (p *ProcessContextBuilder) WithInstruction(instruction 
process_ctrl.Instruction) *ProcessContextBuilder {
+func (p *ProcessContextBuilder) WithInstruction(instruction Instruction) 
*ProcessContextBuilder {
        if instruction != nil {
                p.processContext.SetInstruction(instruction)
        }
@@ -81,6 +81,6 @@ func (p *ProcessContextBuilder) WithIsAsyncExecution(async 
bool) *ProcessContext
        return p
 }
 
-func (p *ProcessContextBuilder) Build() process_ctrl.ProcessContext {
+func (p *ProcessContextBuilder) Build() ProcessContext {
        return p.processContext
 }
diff --git a/pkg/saga/statemachine/engine/default_statemachine_config.go 
b/pkg/saga/statemachine/engine/default_statemachine_config.go
deleted file mode 100644
index 2d7b1d8e..00000000
--- a/pkg/saga/statemachine/engine/default_statemachine_config.go
+++ /dev/null
@@ -1,123 +0,0 @@
-package engine
-
-import (
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
-)
-
-const (
-       DefaultTransOperTimeout     = 60000 * 30
-       DefaultServiceInvokeTimeout = 60000 * 5
-)
-
-type DefaultStateMachineConfig struct {
-       // Configuration
-       transOperationTimeout int
-       serviceInvokeTimeout  int
-       charset               string
-       defaultTenantId       string
-
-       // Components
-
-       // Store related components
-       stateLogRepository     store.StateLogRepository
-       stateLogStore          store.StateLogStore
-       stateLangStore         store.StateLangStore
-       stateMachineRepository store.StateMachineRepository
-
-       // Expression related components
-       expressionFactoryManager expr.ExpressionFactoryManager
-       expressionResolver       expr.ExpressionResolver
-
-       // Invoker related components
-       serviceInvokerManager invoker.ServiceInvokerManager
-       scriptInvokerManager  invoker.ScriptInvokerManager
-
-       // Other components
-       statusDecisionStrategy status_decision.StatusDecisionStrategy
-       seqGenerator           sequence.SeqGenerator
-}
-
-func (c *DefaultStateMachineConfig) StateLogRepository() 
store.StateLogRepository {
-       return c.stateLogRepository
-}
-
-func (c *DefaultStateMachineConfig) StateMachineRepository() 
store.StateMachineRepository {
-       return c.stateMachineRepository
-}
-
-func (c *DefaultStateMachineConfig) StateLogStore() store.StateLogStore {
-       return c.stateLogStore
-}
-
-func (c *DefaultStateMachineConfig) StateLangStore() store.StateLangStore {
-       return c.stateLangStore
-}
-
-func (c *DefaultStateMachineConfig) ExpressionFactoryManager() 
expr.ExpressionFactoryManager {
-       return c.expressionFactoryManager
-}
-
-func (c *DefaultStateMachineConfig) ExpressionResolver() 
expr.ExpressionResolver {
-       return c.expressionResolver
-}
-
-func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator {
-       return c.seqGenerator
-}
-
-func (c *DefaultStateMachineConfig) StatusDecisionStrategy() 
status_decision.StatusDecisionStrategy {
-       return c.statusDecisionStrategy
-}
-
-func (c *DefaultStateMachineConfig) EventPublisher() events.EventPublisher {
-       //TODO implement me
-       panic("implement me")
-}
-
-func (c *DefaultStateMachineConfig) AsyncEventPublisher() 
events.EventPublisher {
-       //TODO implement me
-       panic("implement me")
-}
-
-func (c *DefaultStateMachineConfig) ServiceInvokerManager() 
invoker.ServiceInvokerManager {
-       return c.serviceInvokerManager
-}
-
-func (c *DefaultStateMachineConfig) ScriptInvokerManager() 
invoker.ScriptInvokerManager {
-       return c.scriptInvokerManager
-}
-
-func (c *DefaultStateMachineConfig) CharSet() string {
-       return c.charset
-}
-
-func (c *DefaultStateMachineConfig) SetCharSet(charset string) {
-       c.charset = charset
-}
-
-func (c *DefaultStateMachineConfig) DefaultTenantId() string {
-       return c.defaultTenantId
-}
-
-func (c *DefaultStateMachineConfig) TransOperationTimeout() int {
-       return c.transOperationTimeout
-}
-
-func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int {
-       return c.serviceInvokeTimeout
-}
-
-func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
-       c := &DefaultStateMachineConfig{
-               transOperationTimeout: DefaultTransOperTimeout,
-               serviceInvokeTimeout:  DefaultServiceInvokeTimeout,
-               charset:               "UTF-8",
-               defaultTenantId:       "000001",
-       }
-       return c
-}
diff --git a/pkg/saga/statemachine/engine/events/event_bus.go 
b/pkg/saga/statemachine/engine/events/event_bus.go
deleted file mode 100644
index 77a1e616..00000000
--- a/pkg/saga/statemachine/engine/events/event_bus.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package events
-
-import (
-       "context"
-)
-
-type EventBus interface {
-       Offer(ctx context.Context, event Event) (bool, error)
-
-       RegisterEventConsumer(consumer EventConsumer)
-}
-
-type BaseEventBus struct {
-       eventConsumerList []EventConsumer
-}
-
-func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
-       if b.eventConsumerList == nil {
-               b.eventConsumerList = make([]EventConsumer, 0)
-       }
-       b.eventConsumerList = append(b.eventConsumerList, consumer)
-}
-
-func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer {
-       var acceptedConsumerList = make([]EventConsumer, 0)
-       for i := range b.eventConsumerList {
-               eventConsumer := b.eventConsumerList[i]
-               if eventConsumer.Accept(event) {
-                       acceptedConsumerList = append(acceptedConsumerList, 
eventConsumer)
-               }
-       }
-       return acceptedConsumerList
-}
-
-type DirectEventBus struct {
-       BaseEventBus
-}
-
-func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
-       eventConsumerList := d.GetEventConsumerList(event)
-       if len(eventConsumerList) == 0 {
-               //TODO logger
-               return false, nil
-       }
-
-       //processContext, _ := event.(process_ctrl.ProcessContext)
-
-       return true, nil
-}
diff --git a/pkg/saga/statemachine/engine/exception/exception.go 
b/pkg/saga/statemachine/engine/exception/exception.go
new file mode 100644
index 00000000..f5c58239
--- /dev/null
+++ b/pkg/saga/statemachine/engine/exception/exception.go
@@ -0,0 +1,54 @@
+package exception
+
+import "github.com/seata/seata-go/pkg/util/errors"
+
+type EngineExecutionException struct {
+       errors.SeataError
+       stateName              string
+       stateMachineName       string
+       stateMachineInstanceId string
+       stateInstanceId        string
+}
+
+func NewEngineExecutionException(code errors.TransactionErrorCode, msg string, 
parent error) *EngineExecutionException {
+       seataError := errors.New(code, msg, parent)
+       return &EngineExecutionException{
+               SeataError: *seataError,
+       }
+}
+
+func (e *EngineExecutionException) StateName() string {
+       return e.stateName
+}
+
+func (e *EngineExecutionException) SetStateName(stateName string) {
+       e.stateName = stateName
+}
+
+func (e *EngineExecutionException) StateMachineName() string {
+       return e.stateMachineName
+}
+
+func (e *EngineExecutionException) SetStateMachineName(stateMachineName 
string) {
+       e.stateMachineName = stateMachineName
+}
+
+func (e *EngineExecutionException) StateMachineInstanceId() string {
+       return e.stateMachineInstanceId
+}
+
+func (e *EngineExecutionException) 
SetStateMachineInstanceId(stateMachineInstanceId string) {
+       e.stateMachineInstanceId = stateMachineInstanceId
+}
+
+func (e *EngineExecutionException) StateInstanceId() string {
+       return e.stateInstanceId
+}
+
+func (e *EngineExecutionException) SetStateInstanceId(stateInstanceId string) {
+       e.stateInstanceId = stateInstanceId
+}
+
+type ForwardInvalidException struct {
+       EngineExecutionException
+}
diff --git a/pkg/saga/statemachine/engine/expr/expression.go 
b/pkg/saga/statemachine/engine/expr/expression.go
index 9706cda5..767c49da 100644
--- a/pkg/saga/statemachine/engine/expr/expression.go
+++ b/pkg/saga/statemachine/engine/expr/expression.go
@@ -1,10 +1,93 @@
 package expr
 
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
+       "strings"
+)
+
+const DefaultExpressionType string = "Default"
+
 type ExpressionResolver interface {
+       Expression(expressionStr string) Expression
+       ExpressionFactoryManager() ExpressionFactoryManager
+       SetExpressionFactoryManager(expressionFactoryManager 
ExpressionFactoryManager)
 }
 
 type Expression interface {
+       Value(elContext any) any
+       SetValue(value any, elContext any)
+       ExpressionString() string
+}
+
+type ExpressionFactory interface {
+       CreateExpression(expression string) Expression
 }
 
 type ExpressionFactoryManager struct {
+       expressionFactoryMap map[string]ExpressionFactory
+}
+
+func NewExpressionFactoryManager() *ExpressionFactoryManager {
+       return &ExpressionFactoryManager{
+               expressionFactoryMap: make(map[string]ExpressionFactory),
+       }
+}
+
+func (e *ExpressionFactoryManager) GetExpressionFactory(expressionType string) 
ExpressionFactory {
+       if strings.TrimSpace(expressionType) == "" {
+               expressionType = DefaultExpressionType
+       }
+       return e.expressionFactoryMap[expressionType]
+}
+
+func (e *ExpressionFactoryManager) 
SetExpressionFactoryMap(expressionFactoryMap map[string]ExpressionFactory) {
+       for k, v := range expressionFactoryMap {
+               e.expressionFactoryMap[k] = v
+       }
+}
+
+func (e *ExpressionFactoryManager) PutExpressionFactory(expressionType string, 
factory ExpressionFactory) {
+       e.expressionFactoryMap[expressionType] = factory
+}
+
+type SequenceExpression struct {
+       seqGenerator sequence.SeqGenerator
+       entity       string
+       rule         string
+}
+
+func (s *SequenceExpression) SeqGenerator() sequence.SeqGenerator {
+       return s.seqGenerator
+}
+
+func (s *SequenceExpression) SetSeqGenerator(seqGenerator 
sequence.SeqGenerator) {
+       s.seqGenerator = seqGenerator
+}
+
+func (s *SequenceExpression) Entity() string {
+       return s.entity
+}
+
+func (s *SequenceExpression) SetEntity(entity string) {
+       s.entity = entity
+}
+
+func (s *SequenceExpression) Rule() string {
+       return s.rule
+}
+
+func (s *SequenceExpression) SetRule(rule string) {
+       s.rule = rule
+}
+
+func (s SequenceExpression) Value(elContext any) any {
+       return s.seqGenerator.GenerateId(s.entity, s.rule)
+}
+
+func (s SequenceExpression) SetValue(value any, elContext any) {
+
+}
+
+func (s SequenceExpression) ExpressionString() string {
+       return s.entity + "|" + s.rule
 }
diff --git a/pkg/saga/statemachine/engine/process_ctrl/instruction.go 
b/pkg/saga/statemachine/engine/process_ctrl/instruction.go
deleted file mode 100644
index 2f7c0a54..00000000
--- a/pkg/saga/statemachine/engine/process_ctrl/instruction.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package process_ctrl
-
-import (
-       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
-)
-
-type Instruction interface {
-}
-
-type StateInstruction struct {
-       StateName        string
-       StateMachineName string
-       TenantId         string
-       End              bool
-}
-
-func (s StateInstruction) GetState(context ProcessContext) (statelang.State, 
error) {
-       //TODO implement me
-       panic("implement me")
-}
-
-func NewStateInstruction(stateMachineName string, tenantId string) 
*StateInstruction {
-       return &StateInstruction{StateMachineName: stateMachineName, TenantId: 
tenantId}
-}
diff --git a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go 
b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
deleted file mode 100644
index aae9fe89..00000000
--- a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
+++ /dev/null
@@ -1,125 +0,0 @@
-package engine
-
-import (
-       "context"
-       "time"
-
-       "github.com/pkg/errors"
-       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
-       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
-)
-
-type ProcessCtrlStateMachineEngine struct {
-       StateMachineConfig StateMachineConfig
-}
-
-func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, 
stateMachineName string, tenantId string, startParams map[string]interface{}) 
(statelang.StateMachineInstance, error) {
-       return p.startInternal(ctx, stateMachineName, tenantId, "", 
startParams, false, nil)
-}
-
-func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, 
stateMachineName string, tenantId string, businessKey string, startParams 
map[string]interface{}, async bool, callback CallBack) 
(statelang.StateMachineInstance, error) {
-       if tenantId == "" {
-               tenantId = p.StateMachineConfig.DefaultTenantId()
-       }
-
-       stateMachineInstance, err := p.createMachineInstance(stateMachineName, 
tenantId, businessKey, startParams)
-       if err != nil {
-               return nil, err
-       }
-
-       // Build the process_ctrl context.
-       processContextBuilder := NewProcessContextBuilder().
-               WithProcessType(process_ctrl.StateLang).
-               WithOperationName(constant.OperationNameStart).
-               WithAsyncCallback(callback).
-               
WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, tenantId)).
-               WithStateMachineInstance(stateMachineInstance).
-               WithStateMachineConfig(p.StateMachineConfig).
-               WithStateMachineEngine(p).
-               WithIsAsyncExecution(async)
-
-       contextMap := p.copyMap(startParams)
-
-       stateMachineInstance.SetContext(contextMap)
-
-       processContext := 
processContextBuilder.WithStateMachineContextVariables(contextMap).Build()
-
-       if stateMachineInstance.StateMachine().IsPersist() && 
p.StateMachineConfig.StateLogStore() != nil {
-               err := 
p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, 
stateMachineInstance, processContext)
-               if err != nil {
-                       return nil, err
-               }
-       }
-
-       if stateMachineInstance.ID() == "" {
-               
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst,
 ""))
-       }
-
-       var eventPublisher events.EventPublisher
-       if async {
-               eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
-       } else {
-               eventPublisher = p.StateMachineConfig.EventPublisher()
-       }
-
-       _, err = eventPublisher.PushEvent(ctx, processContext)
-       if err != nil {
-               return nil, err
-       }
-
-       return stateMachineInstance, nil
-}
-
-// copyMap not deep copy, so best practice: Don’t pass by reference
-func (p ProcessCtrlStateMachineEngine) copyMap(startParams 
map[string]interface{}) map[string]interface{} {
-       copyMap := make(map[string]interface{}, len(startParams))
-       for k, v := range startParams {
-               copyMap[k] = v
-       }
-       return copyMap
-}
-
-func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName 
string, tenantId string, businessKey string, startParams 
map[string]interface{}) (statelang.StateMachineInstance, error) {
-       stateMachine, err := 
p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName,
 tenantId)
-       if err != nil {
-               return nil, err
-       }
-
-       if stateMachine == nil {
-               return nil, errors.New("StateMachine [" + stateMachineName + "] 
is not exists")
-       }
-
-       stateMachineInstance := statelang.NewStateMachineInstanceImpl()
-       stateMachineInstance.SetStateMachine(stateMachine)
-       stateMachineInstance.SetTenantID(tenantId)
-       stateMachineInstance.SetBusinessKey(businessKey)
-       stateMachineInstance.SetStartParams(startParams)
-       if startParams != nil {
-               if businessKey != "" {
-                       startParams[constant.VarNameBusinesskey] = businessKey
-               }
-
-               if startParams[constant.VarNameParentId] != nil {
-                       parentId, ok := 
startParams[constant.VarNameParentId].(string)
-                       if !ok {
-
-                       }
-                       stateMachineInstance.SetParentID(parentId)
-                       delete(startParams, constant.VarNameParentId)
-               }
-       }
-
-       stateMachineInstance.SetStatus(statelang.RU)
-       stateMachineInstance.SetRunning(true)
-
-       now := time.Now()
-       stateMachineInstance.SetStartedTime(now)
-       stateMachineInstance.SetUpdatedTime(now)
-       return stateMachineInstance, nil
-}
-
-func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig) 
*ProcessCtrlStateMachineEngine {
-       return &ProcessCtrlStateMachineEngine{StateMachineConfig: 
stateMachineConfig}
-}
diff --git a/pkg/saga/statemachine/engine/statemachine_engine.go 
b/pkg/saga/statemachine/engine/statemachine_engine.go
deleted file mode 100644
index 6b3954c1..00000000
--- a/pkg/saga/statemachine/engine/statemachine_engine.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package engine
-
-import (
-       "context"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
-       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
-)
-
-type StateMachineEngine interface {
-       Start(ctx context.Context, stateMachineName string, tenantId string, 
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
-}
-
-type CallBack interface {
-       OnFinished(ctx context.Context, context process_ctrl.ProcessContext, 
stateMachineInstance statelang.StateMachineInstance)
-       OnError(ctx context.Context, context process_ctrl.ProcessContext, 
stateMachineInstance statelang.StateMachineInstance, err error)
-}
diff --git a/pkg/saga/statemachine/engine/statemachine_engine_test.go 
b/pkg/saga/statemachine/engine/statemachine_engine_test.go
deleted file mode 100644
index c9ac2518..00000000
--- a/pkg/saga/statemachine/engine/statemachine_engine_test.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package engine
-
-import "testing"
-
-func TestEngine(t *testing.T) {
-
-}
diff --git a/pkg/saga/statemachine/engine/status_decision/status_decision.go 
b/pkg/saga/statemachine/engine/status_decision/status_decision.go
deleted file mode 100644
index 6def093e..00000000
--- a/pkg/saga/statemachine/engine/status_decision/status_decision.go
+++ /dev/null
@@ -1,4 +0,0 @@
-package status_decision
-
-type StatusDecisionStrategy interface {
-}
diff --git 
a/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go 
b/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go
new file mode 100644
index 00000000..28d60d0a
--- /dev/null
+++ b/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go
@@ -0,0 +1,174 @@
+package handlers
+
+import (
+       "context"
+       "errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       seataErrors "github.com/seata/seata-go/pkg/util/errors"
+       "github.com/seata/seata-go/pkg/util/log"
+)
+
+type ServiceTaskStateHandler struct {
+       interceptors []core.StateHandlerInterceptor
+}
+
+func NewServiceTaskStateHandler() *ServiceTaskStateHandler {
+       return &ServiceTaskStateHandler{}
+}
+
+func (s *ServiceTaskStateHandler) State() string {
+       return constant.StateTypeServiceTask
+}
+
+func (s *ServiceTaskStateHandler) Process(ctx context.Context, processContext 
core.ProcessContext) error {
+       stateInstruction, ok := 
processContext.GetInstruction().(core.StateInstruction)
+       if !ok {
+               return errors.New("invalid state instruction from 
processContext")
+       }
+       stateInterface, err := stateInstruction.GetState(processContext)
+       if err != nil {
+               return err
+       }
+       serviceTaskStateImpl, ok := stateInterface.(*state.ServiceTaskStateImpl)
+
+       serviceName := serviceTaskStateImpl.ServiceName()
+       methodName := serviceTaskStateImpl.ServiceMethod()
+       stateInstance, ok := 
processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)
+       if !ok {
+               return errors.New("invalid state instance type from 
processContext")
+       }
+
+       // invoke service task and record
+       var result any
+       var resultErr error
+       handleResultErr := func(err error) {
+               log.Error("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s], 
Method[%s] Execute failed.",
+                       serviceTaskStateImpl.Name(), serviceName, methodName, 
err)
+
+               hierarchicalProcessContext, ok := 
processContext.(core.HierarchicalProcessContext)
+               if !ok {
+                       return
+               }
+               
hierarchicalProcessContext.SetVariable(constant.VarNameCurrentException, err)
+               core.HandleException(processContext, 
serviceTaskStateImpl.AbstractTaskState, err)
+       }
+
+       input, ok := 
processContext.GetVariable(constant.VarNameInputParams).([]any)
+       if !ok {
+               handleResultErr(errors.New("invalid input params type from 
processContext"))
+               return nil
+       }
+
+       stateInstance.SetStatus(statelang.RU)
+       log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[%s], 
ServiceName[%s], Method[%s], Input:%s",
+               serviceTaskStateImpl.Name(), serviceName, methodName, input)
+
+       if _, ok := stateInterface.(state.CompensateSubStateMachineState); ok {
+               // If it is the compensation of the subState machine,
+               // directly call the state machine's compensate method
+               stateMachineEngine, ok := 
processContext.GetVariable(constant.VarNameStateMachineEngine).(core.StateMachineEngine)
+               if !ok {
+                       handleResultErr(errors.New("invalid stateMachineEngine 
type from processContext"))
+                       return nil
+               }
+
+               result, resultErr = s.compensateSubStateMachine(ctx, 
processContext, serviceTaskStateImpl, input,
+                       stateInstance, stateMachineEngine)
+               if resultErr != nil {
+                       handleResultErr(resultErr)
+                       return nil
+               }
+       } else {
+               stateMachineConfig, ok := 
processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig)
+               if !ok {
+                       handleResultErr(errors.New("invalid stateMachineConfig 
type from processContext"))
+                       return nil
+               }
+
+               serviceInvoker := 
stateMachineConfig.ServiceInvokerManager().ServiceInvoker(serviceTaskStateImpl.ServiceType())
+               if serviceInvoker == nil {
+                       resultErr = 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                               "No such 
ServiceInvoker["+serviceTaskStateImpl.ServiceType()+"]", nil)
+                       handleResultErr(resultErr)
+                       return nil
+               }
+
+               result, resultErr = serviceInvoker.Invoke(ctx, input, 
serviceTaskStateImpl)
+               if resultErr != nil {
+                       handleResultErr(resultErr)
+                       return nil
+               }
+       }
+
+       log.Debugf("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s], 
Method[%s] Execute finish. result: %s",
+               serviceTaskStateImpl.Name(), serviceName, methodName, result)
+
+       if result != nil {
+               stateInstance.SetOutputParams(result)
+               hierarchicalProcessContext, ok := 
processContext.(core.HierarchicalProcessContext)
+               if !ok {
+                       handleResultErr(errors.New("invalid hierarchical 
process context type from processContext"))
+                       return nil
+               }
+
+               
hierarchicalProcessContext.SetVariable(constant.VarNameOutputParams, result)
+       }
+
+       return nil
+}
+
+func (s *ServiceTaskStateHandler) StateHandlerInterceptorList() 
[]core.StateHandlerInterceptor {
+       return s.interceptors
+}
+
+func (s *ServiceTaskStateHandler) 
RegistryStateHandlerInterceptor(stateHandlerInterceptor 
core.StateHandlerInterceptor) {
+       s.interceptors = append(s.interceptors, stateHandlerInterceptor)
+}
+
+func (s *ServiceTaskStateHandler) compensateSubStateMachine(ctx 
context.Context, processContext core.ProcessContext,
+       serviceTaskState state.ServiceTaskState, input any, instance 
statelang.StateInstance,
+       machineEngine core.StateMachineEngine) (any, error) {
+       subStateMachineParentId, ok := 
processContext.GetVariable(serviceTaskState.Name() + 
constant.VarNameSubMachineParentId).(string)
+       if !ok {
+               return nil, errors.New("invalid subStateMachineParentId type 
from processContext")
+       }
+
+       if subStateMachineParentId == "" {
+               return nil, 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                       "sub statemachine parentId is required", nil)
+       }
+
+       stateMachineConfig := 
processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig)
+       subInst, err := 
stateMachineConfig.StateLogStore().GetStateMachineInstanceByParentId(subStateMachineParentId)
+       if err != nil {
+               return nil, err
+       }
+
+       if subInst == nil || len(subInst) == 0 {
+               return nil, 
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+                       "cannot find sub statemachine instance by 
parentId:"+subStateMachineParentId, nil)
+       }
+
+       subStateMachineInstId := subInst[0].ID()
+       log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to compensate sub statemachine 
[id:%s]", subStateMachineInstId)
+
+       startParams := make(map[string]any)
+
+       if inputList, ok := input.([]any); ok {
+               if len(inputList) > 0 {
+                       startParams = inputList[0].(map[string]any)
+               }
+       } else if inputMap, ok := input.(map[string]any); ok {
+               startParams = inputMap
+       }
+
+       compensateInst, err := machineEngine.Compensate(ctx, 
subStateMachineInstId, startParams)
+       instance.SetStatus(compensateInst.CompensationStatus())
+       log.Debugf("<<<<<<<<<<<<<<<<<<<<<< Compensate sub statemachine [id:%s] 
finished with status[%s], "+"compensateState[%s]",
+               subStateMachineInstId, compensateInst.Status(), 
compensateInst.CompensationStatus())
+       return compensateInst.EndParams(), nil
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_type.go 
b/pkg/saga/statemachine/process_ctrl/process/process_type.go
similarity index 82%
rename from pkg/saga/statemachine/engine/process_ctrl/process_type.go
rename to pkg/saga/statemachine/process_ctrl/process/process_type.go
index 14027a28..b1a8e030 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/process_type.go
+++ b/pkg/saga/statemachine/process_ctrl/process/process_type.go
@@ -1,4 +1,4 @@
-package process_ctrl
+package process
 
 type ProcessType string
 
diff --git a/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go 
b/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
index dc466da5..4de6e5de 100644
--- a/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
+++ b/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
@@ -64,7 +64,7 @@ func NewCompensateSubStateMachineStateParser() 
*CompensateSubStateMachineStatePa
 }
 
 func (c CompensateSubStateMachineStateParser) StateType() string {
-       return constant.CompensateSubMachine
+       return constant.StateTypeCompensateSubMachine
 }
 
 func (c CompensateSubStateMachineStateParser) Parse(stateName string, stateMap 
map[string]interface{}) (statelang.State, error) {
diff --git a/pkg/saga/statemachine/statelang/state/loop_start_state.go 
b/pkg/saga/statemachine/statelang/state/loop_start_state.go
new file mode 100644
index 00000000..e059431d
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state/loop_start_state.go
@@ -0,0 +1,22 @@
+package state
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type LoopStartState interface {
+       statelang.State
+}
+
+type LoopStartStateImpl struct {
+       *statelang.BaseState
+}
+
+func NewLoopStartStateImpl() *LoopStartStateImpl {
+       baseState := statelang.NewBaseState()
+       baseState.SetType(constant.StateTypeLoopStart)
+       return &LoopStartStateImpl{
+               BaseState: baseState,
+       }
+}
diff --git a/pkg/saga/statemachine/statelang/state/sub_state_machine.go 
b/pkg/saga/statemachine/statelang/state/sub_state_machine.go
index bf0d96d5..78683a0a 100644
--- a/pkg/saga/statemachine/statelang/state/sub_state_machine.go
+++ b/pkg/saga/statemachine/statelang/state/sub_state_machine.go
@@ -56,7 +56,7 @@ func NewCompensateSubStateMachineStateImpl() 
*CompensateSubStateMachineStateImpl
                ServiceTaskStateImpl: NewServiceTaskStateImpl(),
                hashcode:             uuid.String(),
        }
-       c.SetType(constant.CompensateSubMachine)
+       c.SetType(constant.StateTypeCompensateSubMachine)
        return c
 }
 
diff --git a/pkg/saga/statemachine/statelang/state/task_state.go 
b/pkg/saga/statemachine/statelang/state/task_state.go
index 734680ce..bc9880ac 100644
--- a/pkg/saga/statemachine/statelang/state/task_state.go
+++ b/pkg/saga/statemachine/statelang/state/task_state.go
@@ -38,7 +38,7 @@ type Loop interface {
 
 type ExceptionMatch interface {
        Exceptions() []string
-
+       // TODO: go dose not support get reflect.Type by string, not use it now
        ExceptionTypes() []reflect.Type
 
        SetExceptionTypes(ExceptionTypes []reflect.Type)
@@ -79,7 +79,9 @@ type AbstractTaskState struct {
        loop                        Loop
        catches                     []ExceptionMatch
        input                       []interface{}
+       inputExpressions            []interface{}
        output                      map[string]interface{}
+       outputExpressions           map[string]interface{}
        compensatePersistModeUpdate bool
        retryPersistModeUpdate      bool
        forCompensation             bool
@@ -96,6 +98,22 @@ func NewAbstractTaskState() *AbstractTaskState {
        }
 }
 
+func (a *AbstractTaskState) InputExpressions() []interface{} {
+       return a.inputExpressions
+}
+
+func (a *AbstractTaskState) SetInputExpressions(inputExpressions 
[]interface{}) {
+       a.inputExpressions = inputExpressions
+}
+
+func (a *AbstractTaskState) OutputExpressions() map[string]interface{} {
+       return a.outputExpressions
+}
+
+func (a *AbstractTaskState) SetOutputExpressions(outputExpressions 
map[string]interface{}) {
+       a.outputExpressions = outputExpressions
+}
+
 func (a *AbstractTaskState) Input() []interface{} {
        return a.input
 }
diff --git a/pkg/saga/statemachine/statelang/statemachine_instance.go 
b/pkg/saga/statemachine/statelang/statemachine_instance.go
index 399d222b..9da71ee2 100644
--- a/pkg/saga/statemachine/statelang/statemachine_instance.go
+++ b/pkg/saga/statemachine/statelang/statemachine_instance.go
@@ -71,9 +71,9 @@ type StateMachineInstance interface {
 
        SetBusinessKey(businessKey string)
 
-       Error() error
+       Exception() error
 
-       SetError(err error)
+       SetException(err error)
 
        StartParams() map[string]interface{}
 
@@ -83,6 +83,8 @@ type StateMachineInstance interface {
 
        SetEndParams(endParams map[string]interface{})
 
+       Context() map[string]interface{}
+
        PutContext(key string, value interface{})
 
        SetContext(context map[string]interface{})
@@ -115,7 +117,7 @@ type StateMachineInstanceImpl struct {
        startedTime           time.Time
        endTime               time.Time
        updatedTime           time.Time
-       err                   error
+       exception             error
        serializedError       interface{}
        endParams             map[string]interface{}
        serializedEndParams   interface{}
@@ -247,12 +249,12 @@ func (s *StateMachineInstanceImpl) 
SetBusinessKey(businessKey string) {
        s.businessKey = businessKey
 }
 
-func (s *StateMachineInstanceImpl) Error() error {
-       return s.err
+func (s *StateMachineInstanceImpl) Exception() error {
+       return s.exception
 }
 
-func (s *StateMachineInstanceImpl) SetError(err error) {
-       s.err = err
+func (s *StateMachineInstanceImpl) SetException(err error) {
+       s.exception = err
 }
 
 func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} {
@@ -271,6 +273,10 @@ func (s *StateMachineInstanceImpl) SetEndParams(endParams 
map[string]interface{}
        s.endParams = endParams
 }
 
+func (s *StateMachineInstanceImpl) Context() map[string]interface{} {
+       return s.context
+}
+
 func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) {
        s.contextMutex.Lock()
        defer s.contextMutex.Unlock()
diff --git a/pkg/saga/statemachine/engine/store/db/db.go 
b/pkg/saga/statemachine/store/db/db.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/db.go
rename to pkg/saga/statemachine/store/db/db.go
diff --git a/pkg/saga/statemachine/engine/store/db/db_test.go 
b/pkg/saga/statemachine/store/db/db_test.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/db_test.go
rename to pkg/saga/statemachine/store/db/db_test.go
diff --git a/pkg/saga/statemachine/engine/store/db/statelang.go 
b/pkg/saga/statemachine/store/db/statelang.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/statelang.go
rename to pkg/saga/statemachine/store/db/statelang.go
diff --git a/pkg/saga/statemachine/engine/store/db/statelang_test.go 
b/pkg/saga/statemachine/store/db/statelang_test.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/statelang_test.go
rename to pkg/saga/statemachine/store/db/statelang_test.go
diff --git a/pkg/saga/statemachine/engine/store/db/statelog.go 
b/pkg/saga/statemachine/store/db/statelog.go
similarity index 98%
rename from pkg/saga/statemachine/engine/store/db/statelog.go
rename to pkg/saga/statemachine/store/db/statelog.go
index 26b89ed6..23c3b551 100644
--- a/pkg/saga/statemachine/engine/store/db/statelog.go
+++ b/pkg/saga/statemachine/store/db/statelog.go
@@ -6,7 +6,7 @@ import (
        "fmt"
        "github.com/pkg/errors"
        "github.com/seata/seata-go/pkg/saga/statemachine/constant"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
        "github.com/seata/seata-go/pkg/saga/statemachine/engine/serializer"
        "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
@@ -89,7 +89,7 @@ func NewStateLogStore(db *sql.DB, tablePrefix string) 
*StateLogStore {
 }
 
 func (s *StateLogStore) RecordStateMachineStarted(ctx context.Context, 
machineInstance statelang.StateMachineInstance,
-       context process_ctrl.ProcessContext) error {
+       context core.ProcessContext) error {
        if machineInstance == nil {
                return nil
        }
@@ -133,15 +133,15 @@ func (s *StateLogStore) RecordStateMachineStarted(ctx 
context.Context, machineIn
 }
 
 func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, 
machineInstance statelang.StateMachineInstance,
-       context process_ctrl.ProcessContext) error {
+       context core.ProcessContext) error {
        if machineInstance == nil {
                return nil
        }
 
        endParams := machineInstance.EndParams()
 
-       if statelang.SU == machineInstance.Status() && machineInstance.Error() 
!= nil {
-               machineInstance.SetError(nil)
+       if statelang.SU == machineInstance.Status() && 
machineInstance.Exception() != nil {
+               machineInstance.SetException(nil)
        }
 
        serializedEndParams, err := s.paramsSerializer.Serialize(endParams)
@@ -149,7 +149,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx 
context.Context, machineI
                return err
        }
        machineInstance.SetSerializedEndParams(serializedEndParams)
-       serializedError, err := 
s.errorSerializer.Serialize(machineInstance.Error())
+       serializedError, err := 
s.errorSerializer.Serialize(machineInstance.Exception())
        if err != nil {
                return err
        }
@@ -171,7 +171,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx 
context.Context, machineI
 }
 
 func (s *StateLogStore) RecordStateMachineRestarted(ctx context.Context, 
machineInstance statelang.StateMachineInstance,
-       context process_ctrl.ProcessContext) error {
+       context core.ProcessContext) error {
        if machineInstance == nil {
                return nil
        }
@@ -190,7 +190,7 @@ func (s *StateLogStore) RecordStateMachineRestarted(ctx 
context.Context, machine
 }
 
 func (s *StateLogStore) RecordStateStarted(ctx context.Context, stateInstance 
statelang.StateInstance,
-       context process_ctrl.ProcessContext) error {
+       context core.ProcessContext) error {
        if stateInstance == nil {
                return nil
        }
@@ -235,7 +235,7 @@ func (s *StateLogStore) RecordStateStarted(ctx 
context.Context, stateInstance st
        return nil
 }
 
-func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context 
process_ctrl.ProcessContext) bool {
+func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context 
core.ProcessContext) bool {
        //TODO implement me, add forward logic
        return false
 }
@@ -293,7 +293,7 @@ func (s *StateLogStore) getIdIndex(stateInstanceId string, 
separator string) int
 }
 
 func (s *StateLogStore) RecordStateFinished(ctx context.Context, stateInstance 
statelang.StateInstance,
-       context process_ctrl.ProcessContext) error {
+       context core.ProcessContext) error {
        if stateInstance == nil {
                return nil
        }
@@ -372,7 +372,7 @@ func (s *StateLogStore) 
deserializeStateMachineParamsAndException(stateMachineIn
                if err != nil {
                        return err
                }
-               stateMachineInstance.SetError(deserializedError)
+               stateMachineInstance.SetException(deserializedError)
        }
 
        serializedStartParams := stateMachineInstance.SerializedStartParams()
diff --git a/pkg/saga/statemachine/engine/store/db/statelog_test.go 
b/pkg/saga/statemachine/store/db/statelog_test.go
similarity index 94%
rename from pkg/saga/statemachine/engine/store/db/statelog_test.go
rename to pkg/saga/statemachine/store/db/statelog_test.go
index 9044e6df..43767476 100644
--- a/pkg/saga/statemachine/engine/store/db/statelog_test.go
+++ b/pkg/saga/statemachine/store/db/statelog_test.go
@@ -5,19 +5,19 @@ import (
        "fmt"
        "github.com/pkg/errors"
        "github.com/seata/seata-go/pkg/saga/statemachine/constant"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine"
-       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
+       "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
        "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
        "github.com/stretchr/testify/assert"
        "testing"
        "time"
 )
 
-func mockProcessContext(stateMachineName string, stateMachineInstance 
statelang.StateMachineInstance) process_ctrl.ProcessContext {
-       ctx := engine.NewProcessContextBuilder().
-               WithProcessType(process_ctrl.StateLang).
+func mockProcessContext(stateMachineName string, stateMachineInstance 
statelang.StateMachineInstance) core.ProcessContext {
+       ctx := core.NewProcessContextBuilder().
+               WithProcessType(process.StateLang).
                WithOperationName(constant.OperationNameStart).
-               
WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, "000001")).
+               WithInstruction(core.NewStateInstruction(stateMachineName, 
"000001")).
                WithStateMachineInstance(stateMachineInstance).
                Build()
        return ctx
@@ -55,7 +55,7 @@ func TestStateLogStore_RecordStateMachineStarted(t 
*testing.T) {
        assert.Equal(t, expected.ID(), actual.ID())
        assert.Equal(t, expected.MachineID(), actual.MachineID())
        assert.Equal(t, fmt.Sprint(expected.StartParams()), 
fmt.Sprint(actual.StartParams()))
-       assert.Nil(t, actual.Error())
+       assert.Nil(t, actual.Exception())
        assert.Nil(t, actual.SerializedError())
        assert.Equal(t, expected.Status(), actual.Status())
        assert.Equal(t, expected.StartedTime().UnixNano(), 
actual.StartedTime().UnixNano())
@@ -73,7 +73,7 @@ func TestStateLogStore_RecordStateMachineFinished(t 
*testing.T) {
        err := stateLogStore.RecordStateMachineStarted(context.Background(), 
expected, ctx)
        assert.Nil(t, err)
        expected.SetEndParams(map[string]any{"end": 100})
-       expected.SetError(errors.New("this is a test error"))
+       expected.SetException(errors.New("this is a test error"))
        expected.SetStatus(statelang.FA)
        expected.SetEndTime(time.Now())
        expected.SetRunning(false)
@@ -86,7 +86,7 @@ func TestStateLogStore_RecordStateMachineFinished(t 
*testing.T) {
        assert.Equal(t, expected.ID(), actual.ID())
        assert.Equal(t, expected.MachineID(), actual.MachineID())
        assert.Equal(t, fmt.Sprint(expected.StartParams()), 
fmt.Sprint(actual.StartParams()))
-       assert.Equal(t, "this is a test error", actual.Error().Error())
+       assert.Equal(t, "this is a test error", actual.Exception().Error())
        assert.Equal(t, expected.Status(), actual.Status())
        assert.Equal(t, expected.IsRunning(), actual.IsRunning())
        assert.Equal(t, expected.StartedTime().UnixNano(), 
actual.StartedTime().UnixNano())
@@ -240,7 +240,7 @@ func 
TestStateLogStore_GetStateMachineInstanceByBusinessKey(t *testing.T) {
        assert.Equal(t, expected.ID(), actual.ID())
        assert.Equal(t, expected.MachineID(), actual.MachineID())
        assert.Equal(t, fmt.Sprint(expected.StartParams()), 
fmt.Sprint(actual.StartParams()))
-       assert.Nil(t, actual.Error())
+       assert.Nil(t, actual.Exception())
        assert.Nil(t, actual.SerializedError())
        assert.Equal(t, expected.Status(), actual.Status())
        assert.Equal(t, expected.StartedTime().UnixNano(), 
actual.StartedTime().UnixNano())
@@ -269,9 +269,9 @@ func TestStateLogStore_GetStateMachineInstanceByParentId(t 
*testing.T) {
        actual := actualList[0]
        assert.Equal(t, expected.ID(), actual.ID())
        assert.Equal(t, expected.MachineID(), actual.MachineID())
-       // no startParams, endParams and Error
+       // no startParams, endParams and Exception
        assert.NotEqual(t, fmt.Sprint(expected.StartParams()), 
fmt.Sprint(actual.StartParams()))
-       assert.Nil(t, actual.Error())
+       assert.Nil(t, actual.Exception())
        assert.Nil(t, actual.SerializedError())
        assert.Equal(t, expected.Status(), actual.Status())
        assert.Equal(t, expected.StartedTime().UnixNano(), 
actual.StartedTime().UnixNano())
diff --git a/pkg/saga/statemachine/store/repository/state_machine_repository.go 
b/pkg/saga/statemachine/store/repository/state_machine_repository.go
new file mode 100644
index 00000000..8a62cc49
--- /dev/null
+++ b/pkg/saga/statemachine/store/repository/state_machine_repository.go
@@ -0,0 +1,34 @@
+package repository
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "io"
+)
+
+type StateMachineRepositoryImpl struct {
+}
+
+func (s StateMachineRepositoryImpl) GetStateMachineById(stateMachineId string) 
(statelang.StateMachine, error) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl) 
GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string) 
(statelang.StateMachine, error) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl) 
GetLastVersionStateMachine(stateMachineName string, tenantId string) 
(statelang.StateMachine, error) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl) RegistryStateMachine(machine 
statelang.StateMachine) error {
+       //TODO implement me
+       panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl) RegistryStateMachineByReader(reader 
io.Reader) error {
+       //TODO implement me
+       panic("implement me")
+}
diff --git a/pkg/tm/transaction_executor_test.go 
b/pkg/tm/transaction_executor_test.go
index f0e513ad..c9bd1084 100644
--- a/pkg/tm/transaction_executor_test.go
+++ b/pkg/tm/transaction_executor_test.go
@@ -361,7 +361,7 @@ func TestBeginNewGtx(t *testing.T) {
        assert.Equal(t, message.GlobalStatusBegin, *GetTxStatus(ctx))
 
        // case return error
-       err := errors.New("Mock Error")
+       err := errors.New("Mock Exception")
        gomonkey.ApplyMethod(reflect.TypeOf(GetGlobalTransactionManager()), 
"Begin",
                func(_ *GlobalTransactionManager, ctx context.Context, timeout 
time.Duration) error {
                        return err
diff --git a/pkg/util/collection/collection.go 
b/pkg/util/collection/collection.go
index 5149e1a2..4dc04f7c 100644
--- a/pkg/util/collection/collection.go
+++ b/pkg/util/collection/collection.go
@@ -17,7 +17,10 @@
 
 package collection
 
-import "strings"
+import (
+       "container/list"
+       "strings"
+)
 
 const (
        KvSplit   = "="
@@ -81,3 +84,42 @@ func DecodeMap(data []byte) map[string]string {
 
        return ctxMap
 }
+
+type Stack struct {
+       list *list.List
+}
+
+func NewStack() *Stack {
+       list := list.New()
+       return &Stack{list}
+}
+
+func (stack *Stack) Push(value interface{}) {
+       stack.list.PushBack(value)
+}
+
+func (stack *Stack) Pop() interface{} {
+       e := stack.list.Back()
+       if e != nil {
+               stack.list.Remove(e)
+               return e.Value
+       }
+       return nil
+}
+
+func (stack *Stack) Peak() interface{} {
+       e := stack.list.Back()
+       if e != nil {
+               return e.Value
+       }
+
+       return nil
+}
+
+func (stack *Stack) Len() int {
+       return stack.list.Len()
+}
+
+func (stack *Stack) Empty() bool {
+       return stack.list.Len() == 0
+}
diff --git a/pkg/util/collection/collection_test.go 
b/pkg/util/collection/collection_test.go
index 74df03cd..10a9e20f 100644
--- a/pkg/util/collection/collection_test.go
+++ b/pkg/util/collection/collection_test.go
@@ -57,3 +57,26 @@ func TestEncodeDecodeMap(t *testing.T) {
                })
        }
 }
+
+func TestStack(t *testing.T) {
+       stack := NewStack()
+       stack.Push(1)
+       stack.Push(2)
+       stack.Push(3)
+       stack.Push(4)
+
+       len := stack.Len()
+       if len != 4 {
+               t.Errorf("stack.Len() failed. Got %d, expected 4.", len)
+       }
+
+       value := stack.Peak().(int)
+       if value != 4 {
+               t.Errorf("stack.Peak() failed. Got %d, expected 4.", value)
+       }
+
+       value = stack.Pop().(int)
+       if value != 4 {
+               t.Errorf("stack.Pop() failed. Got %d, expected 4.", value)
+       }
+}
diff --git a/pkg/util/errors/code.go b/pkg/util/errors/code.go
index dfe50205..9b15e72a 100644
--- a/pkg/util/errors/code.go
+++ b/pkg/util/errors/code.go
@@ -100,4 +100,15 @@ const (
 
        // FencePhaseError have fence phase but is not illegal value
        FencePhaseError
+
+       // ObjectNotExists object not exists
+       ObjectNotExists
+       // StateMachineInstanceNotExists State machine instance not exists
+       StateMachineInstanceNotExists
+       // ContextVariableReplayFailed Context variable replay failed
+       ContextVariableReplayFailed
+       // InvalidParameter Context variable replay failed
+       InvalidParameter
+       // OperationDenied Operation denied
+       OperationDenied
 )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to