This is an automated email from the ASF dual-hosted git repository.
yixia pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push:
new 8d68ae44 feature: sequential execution of state machine in Saga (#681)
8d68ae44 is described below
commit 8d68ae4494db947daa06cc503f8d532bc9e2c908
Author: A Cabbage <[email protected]>
AuthorDate: Thu Jun 27 16:44:28 2024 +0800
feature: sequential execution of state machine in Saga (#681)
---
go.mod | 1 +
go.sum | 2 +
pkg/saga/statemachine/constant/constant.go | 93 +++--
.../{process_ctrl => core}/bussiness_processor.go | 27 +-
.../engine/core/compensation_holder.go | 63 +++
.../engine/core/default_statemachine_config.go | 199 ++++++++++
pkg/saga/statemachine/engine/core/engine_utils.go | 149 ++++++++
.../statemachine/engine/{events => core}/event.go | 2 +-
pkg/saga/statemachine/engine/core/event_bus.go | 113 ++++++
.../engine/{events => core}/event_consumer.go | 15 +-
.../engine/{events => core}/event_publisher.go | 2 +-
pkg/saga/statemachine/engine/core/instruction.go | 96 +++++
.../engine/core/loop_context_holder.go | 92 +++++
.../statemachine/engine/core/loop_task_utils.go | 40 ++
.../statemachine/engine/core/parameter_utils.go | 132 +++++++
.../{process_ctrl => core}/process_context.go | 2 +-
.../statemachine/engine/core/process_controller.go | 31 ++
.../core/process_ctrl_statemachine_engine.go | 424 +++++++++++++++++++++
.../statemachine/engine/core/process_router.go | 194 ++++++++++
.../process_state.go} | 51 +--
pkg/saga/statemachine/engine/core/state_router.go | 151 ++++++++
.../engine/{ => core}/statemachine_config.go | 22 +-
.../engine/core/statemachine_engine.go | 16 +
.../engine/core/statemachine_engine_test.go | 15 +
.../engine/{store => core}/statemachine_store.go | 15 +-
.../statemachine/engine/core/status_decision.go | 188 +++++++++
pkg/saga/statemachine/engine/{ => core}/utils.go | 14 +-
.../engine/default_statemachine_config.go | 123 ------
pkg/saga/statemachine/engine/events/event_bus.go | 49 ---
.../statemachine/engine/exception/exception.go | 54 +++
pkg/saga/statemachine/engine/expr/expression.go | 83 ++++
.../engine/process_ctrl/instruction.go | 24 --
.../engine/process_ctrl_statemachine_engine.go | 125 ------
.../statemachine/engine/statemachine_engine.go | 16 -
.../engine/statemachine_engine_test.go | 7 -
.../engine/status_decision/status_decision.go | 4 -
.../handlers/service_task_state_handler.go | 174 +++++++++
.../process}/process_type.go | 2 +-
.../statelang/parser/sub_state_machine_parser.go | 2 +-
.../statelang/state/loop_start_state.go | 22 ++
.../statelang/state/sub_state_machine.go | 2 +-
.../statemachine/statelang/state/task_state.go | 20 +-
.../statelang/statemachine_instance.go | 20 +-
pkg/saga/statemachine/{engine => }/store/db/db.go | 0
.../statemachine/{engine => }/store/db/db_test.go | 0
.../{engine => }/store/db/statelang.go | 0
.../{engine => }/store/db/statelang_test.go | 0
.../statemachine/{engine => }/store/db/statelog.go | 22 +-
.../{engine => }/store/db/statelog_test.go | 24 +-
.../store/repository/state_machine_repository.go | 34 ++
pkg/tm/transaction_executor_test.go | 2 +-
pkg/util/collection/collection.go | 44 ++-
pkg/util/collection/collection_test.go | 23 ++
pkg/util/errors/code.go | 11 +
54 files changed, 2533 insertions(+), 503 deletions(-)
diff --git a/go.mod b/go.mod
index d24d93c3..c927882f 100644
--- a/go.mod
+++ b/go.mod
@@ -34,6 +34,7 @@ require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0
github.com/mattn/go-sqlite3 v1.14.19
+ golang.org/x/sync v0.6.0
google.golang.org/protobuf v1.30.0
)
diff --git a/go.sum b/go.sum
index 9c0fbc0b..e269c87d 100644
--- a/go.sum
+++ b/go.sum
@@ -942,6 +942,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod
h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
+golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/pkg/saga/statemachine/constant/constant.go
b/pkg/saga/statemachine/constant/constant.go
index ef332c18..a089e7d5 100644
--- a/pkg/saga/statemachine/constant/constant.go
+++ b/pkg/saga/statemachine/constant/constant.go
@@ -1,29 +1,72 @@
package constant
const (
- VarNameProcessType string = "_ProcessType_"
- VarNameOperationName string = "_operation_name_"
- OperationNameStart string = "start"
- VarNameAsyncCallback string = "_async_callback_"
- VarNameStateMachineInst string =
"_current_statemachine_instance_"
- VarNameStateMachine string = "_current_statemachine_"
- VarNameStateMachineEngine string =
"_current_statemachine_engine_"
- VarNameStateMachineConfig string = "_statemachine_config_"
- VarNameStateMachineContext string = "context"
- VarNameIsAsyncExecution string = "_is_async_execution_"
- VarNameStateInst string = "_current_state_instance_"
- SeqEntityStateMachineInst string = "STATE_MACHINE_INST"
- SeqEntityStateInst string = "STATE_INST"
- VarNameBusinesskey string = "_business_key_"
- VarNameParentId string = "_parent_id_"
- StateTypeServiceTask string = "ServiceTask"
- StateTypeChoice string = "Choice"
- StateTypeSubStateMachine string = "SubStateMachine"
- CompensateSubMachine string = "CompensateSubMachine"
- StateTypeSucceed string = "Succeed"
- StateTypeFail string = "Fail"
- StateTypeCompensationTrigger string = "CompensationTrigger"
- StateTypeScriptTask string = "ScriptTask"
- CompensateSubMachineStateNamePrefix string =
"_compensate_sub_machine_state_"
- DefaultScriptType string = "groovy"
+ // region State Types
+ StateTypeServiceTask string = "ServiceTask"
+ StateTypeChoice string = "Choice"
+ StateTypeFail string = "Fail"
+ StateTypeSucceed string = "Succeed"
+ StateTypeCompensationTrigger string = "CompensationTrigger"
+ StateTypeSubStateMachine string = "SubStateMachine"
+ StateTypeCompensateSubMachine string = "CompensateSubMachine"
+ StateTypeScriptTask string = "ScriptTask"
+ StateTypeLoopStart string = "LoopStart"
+ // end region
+
+ // region Service Types
+ ServiceTypeGRPC string = "GRPC"
+ // end region
+
+ // region System Variables
+ VarNameOutputParams string = "outputParams"
+ VarNameProcessType string = "_ProcessType_"
+ VarNameOperationName string = "_operation_name_"
+ OperationNameStart string = "start"
+ OperationNameCompensate string = "compensate"
+ VarNameAsyncCallback string = "_async_callback_"
+ VarNameCurrentExceptionRoute string =
"_current_exception_route_"
+ VarNameIsExceptionNotCatch string = "_is_exception_not_catch_"
+ VarNameSubMachineParentId string = "_sub_machine_parent_id_"
+ VarNameCurrentChoice string = "_current_choice_"
+ VarNameStateMachineInst string =
"_current_statemachine_instance_"
+ VarNameStateMachine string = "_current_statemachine_"
+ VarNameStateMachineEngine string =
"_current_statemachine_engine_"
+ VarNameStateMachineConfig string = "_statemachine_config_"
+ VarNameStateMachineContext string = "context"
+ VarNameIsAsyncExecution string = "_is_async_execution_"
+ VarNameStateInst string = "_current_state_instance_"
+ VarNameBusinesskey string = "_business_key_"
+ VarNameParentId string = "_parent_id_"
+ VarNameCurrentException string = "currentException"
+ CompensateSubMachineStateNamePrefix string =
"_compensate_sub_machine_state_"
+ DefaultScriptType string = "groovy"
+ VarNameSyncExeStack string = "_sync_execution_stack_"
+ VarNameInputParams string = "inputParams"
+ VarNameIsLoopState string = "_is_loop_state_"
+ VarNameCurrentCompensateTriggerState string = "_is_compensating_"
+ VarNameCurrentCompensationHolder string =
"_current_compensation_holder_"
+ VarNameFirstCompensationStateStarted string =
"_first_compensation_state_started"
+ VarNameCurrentLoopContextHolder string =
"_current_loop_context_holder_"
+ // TODO: this lock in process context only has one, try to add more to
add concurrent
+ VarNameProcessContextMutexLock string = "_current_context_mutex_lock"
+ VarNameFailEndStateFlag string = "_fail_end_state_flag_"
+ // end region
+
+ // region of loop
+ LoopCounter string = "loopCounter"
+ LoopSemaphore string = "loopSemaphore"
+ LoopResult string = "loopResult"
+ NumberOfInstances string = "nrOfInstances"
+ NumberOfActiveInstances string = "nrOfActiveInstances"
+ NumberOfCompletedInstances string = "nrOfCompletedInstances"
+ // end region
+
+ // region others
+ SeqEntityStateMachineInst string = "STATE_MACHINE_INST"
+ SeqEntityStateInst string = "STATE_INST"
+ OperationNameForward string = "forward"
+ LoopStateNamePattern string = "-loop-"
+ // end region
+
+ SeperatorParentId string = ":"
)
diff --git a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
b/pkg/saga/statemachine/engine/core/bussiness_processor.go
similarity index 79%
rename from pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
rename to pkg/saga/statemachine/engine/core/bussiness_processor.go
index c12a7c79..6b0a2bb4 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
+++ b/pkg/saga/statemachine/engine/core/bussiness_processor.go
@@ -1,9 +1,10 @@
-package process_ctrl
+package core
import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"sync"
)
@@ -19,14 +20,14 @@ type DefaultBusinessProcessor struct {
mu sync.RWMutex
}
-func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType
ProcessType, processHandler ProcessHandler) {
+func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType
process.ProcessType, processHandler ProcessHandler) {
d.mu.Lock()
defer d.mu.Unlock()
d.processHandlers[string(processType)] = processHandler
}
-func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType
ProcessType, routerHandler RouterHandler) {
+func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType
process.ProcessType, routerHandler RouterHandler) {
d.mu.Lock()
defer d.mu.Unlock()
@@ -55,17 +56,17 @@ func (d *DefaultBusinessProcessor) Route(ctx
context.Context, processContext Pro
return routerHandler.Route(ctx, processContext)
}
-func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType)
(ProcessHandler, error) {
+func (d *DefaultBusinessProcessor) getProcessHandler(processType
process.ProcessType) (ProcessHandler, error) {
d.mu.RLock()
defer d.mu.RUnlock()
processHandler, ok := d.processHandlers[string(processType)]
if !ok {
- return nil, errors.New("Cannot find process handler by type " +
string(processType))
+ return nil, errors.New("Cannot find Process handler by type " +
string(processType))
}
return processHandler, nil
}
-func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType)
(RouterHandler, error) {
+func (d *DefaultBusinessProcessor) getRouterHandler(processType
process.ProcessType) (RouterHandler, error) {
d.mu.RLock()
defer d.mu.RUnlock()
routerHandler, ok := d.routerHandlers[string(processType)]
@@ -75,18 +76,10 @@ func (d *DefaultBusinessProcessor)
getRouterHandler(processType ProcessType) (Ro
return routerHandler, nil
}
-func (d *DefaultBusinessProcessor) matchProcessType(processContext
ProcessContext) ProcessType {
+func (d *DefaultBusinessProcessor) matchProcessType(processContext
ProcessContext) process.ProcessType {
ok := processContext.HasVariable(constant.VarNameProcessType)
if ok {
- return
processContext.GetVariable(constant.VarNameProcessType).(ProcessType)
+ return
processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType)
}
- return StateLang
-}
-
-type ProcessHandler interface {
- Process(ctx context.Context, processContext ProcessContext) error
-}
-
-type RouterHandler interface {
- Route(ctx context.Context, processContext ProcessContext) error
+ return process.StateLang
}
diff --git a/pkg/saga/statemachine/engine/core/compensation_holder.go
b/pkg/saga/statemachine/engine/core/compensation_holder.go
new file mode 100644
index 00000000..562cb020
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/compensation_holder.go
@@ -0,0 +1,63 @@
+package core
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/util/collection"
+ "sync"
+)
+
+type CompensationHolder struct {
+ statesNeedCompensation *sync.Map
+ statesForCompensation *sync.Map
+ stateStackNeedCompensation *collection.Stack
+}
+
+func (c *CompensationHolder) StatesNeedCompensation() *sync.Map {
+ return c.statesNeedCompensation
+}
+
+func (c *CompensationHolder) SetStatesNeedCompensation(statesNeedCompensation
*sync.Map) {
+ c.statesNeedCompensation = statesNeedCompensation
+}
+
+func (c *CompensationHolder) StatesForCompensation() *sync.Map {
+ return c.statesForCompensation
+}
+
+func (c *CompensationHolder) SetStatesForCompensation(statesForCompensation
*sync.Map) {
+ c.statesForCompensation = statesForCompensation
+}
+
+func (c *CompensationHolder) StateStackNeedCompensation() *collection.Stack {
+ return c.stateStackNeedCompensation
+}
+
+func (c *CompensationHolder)
SetStateStackNeedCompensation(stateStackNeedCompensation *collection.Stack) {
+ c.stateStackNeedCompensation = stateStackNeedCompensation
+}
+
+func (c *CompensationHolder) AddToBeCompensatedState(stateName string,
toBeCompensatedState statelang.StateInstance) {
+ c.statesNeedCompensation.Store(stateName, toBeCompensatedState)
+}
+
+func NewCompensationHolder() *CompensationHolder {
+ return &CompensationHolder{
+ statesNeedCompensation: &sync.Map{},
+ statesForCompensation: &sync.Map{},
+ stateStackNeedCompensation: collection.NewStack(),
+ }
+}
+
+func GetCurrentCompensationHolder(ctx context.Context, processContext
ProcessContext, forceCreate bool) *CompensationHolder {
+ compensationholder :=
processContext.GetVariable(constant.VarNameCurrentCompensationHolder).(*CompensationHolder)
+ lock :=
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+ lock.Lock()
+ defer lock.Unlock()
+ if compensationholder == nil && forceCreate {
+ compensationholder = NewCompensationHolder()
+
processContext.SetVariable(constant.VarNameCurrentCompensationHolder,
compensationholder)
+ }
+ return compensationholder
+}
diff --git a/pkg/saga/statemachine/engine/core/default_statemachine_config.go
b/pkg/saga/statemachine/engine/core/default_statemachine_config.go
new file mode 100644
index 00000000..c7692696
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/default_statemachine_config.go
@@ -0,0 +1,199 @@
+package core
+
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
+ "sync"
+)
+
+const (
+ DefaultTransOperTimeout = 60000 * 30
+ DefaultServiceInvokeTimeout = 60000 * 5
+)
+
+type DefaultStateMachineConfig struct {
+ // Configuration
+ transOperationTimeout int
+ serviceInvokeTimeout int
+ charset string
+ defaultTenantId string
+
+ // Components
+
+ // Event publisher
+ syncProcessCtrlEventPublisher EventPublisher
+ asyncProcessCtrlEventPublisher EventPublisher
+
+ // Store related components
+ stateLogRepository StateLogRepository
+ stateLogStore StateLogStore
+ stateLangStore StateLangStore
+ stateMachineRepository StateMachineRepository
+
+ // Expression related components
+ expressionFactoryManager expr.ExpressionFactoryManager
+ expressionResolver expr.ExpressionResolver
+
+ // Invoker related components
+ serviceInvokerManager invoker.ServiceInvokerManager
+ scriptInvokerManager invoker.ScriptInvokerManager
+
+ // Other components
+ statusDecisionStrategy StatusDecisionStrategy
+ seqGenerator sequence.SeqGenerator
+ componentLock *sync.Mutex
+}
+
+func (c *DefaultStateMachineConfig) ComponentLock() *sync.Mutex {
+ return c.componentLock
+}
+
+func (c *DefaultStateMachineConfig) SetComponentLock(componentLock
*sync.Mutex) {
+ c.componentLock = componentLock
+}
+
+func (c *DefaultStateMachineConfig)
SetTransOperationTimeout(transOperationTimeout int) {
+ c.transOperationTimeout = transOperationTimeout
+}
+
+func (c *DefaultStateMachineConfig)
SetServiceInvokeTimeout(serviceInvokeTimeout int) {
+ c.serviceInvokeTimeout = serviceInvokeTimeout
+}
+
+func (c *DefaultStateMachineConfig) SetCharset(charset string) {
+ c.charset = charset
+}
+
+func (c *DefaultStateMachineConfig) SetDefaultTenantId(defaultTenantId string)
{
+ c.defaultTenantId = defaultTenantId
+}
+
+func (c *DefaultStateMachineConfig)
SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher EventPublisher) {
+ c.syncProcessCtrlEventPublisher = syncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig)
SetAsyncProcessCtrlEventPublisher(asyncProcessCtrlEventPublisher
EventPublisher) {
+ c.asyncProcessCtrlEventPublisher = asyncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig) SetStateLogRepository(stateLogRepository
StateLogRepository) {
+ c.stateLogRepository = stateLogRepository
+}
+
+func (c *DefaultStateMachineConfig) SetStateLogStore(stateLogStore
StateLogStore) {
+ c.stateLogStore = stateLogStore
+}
+
+func (c *DefaultStateMachineConfig) SetStateLangStore(stateLangStore
StateLangStore) {
+ c.stateLangStore = stateLangStore
+}
+
+func (c *DefaultStateMachineConfig)
SetStateMachineRepository(stateMachineRepository StateMachineRepository) {
+ c.stateMachineRepository = stateMachineRepository
+}
+
+func (c *DefaultStateMachineConfig)
SetExpressionFactoryManager(expressionFactoryManager
expr.ExpressionFactoryManager) {
+ c.expressionFactoryManager = expressionFactoryManager
+}
+
+func (c *DefaultStateMachineConfig) SetExpressionResolver(expressionResolver
expr.ExpressionResolver) {
+ c.expressionResolver = expressionResolver
+}
+
+func (c *DefaultStateMachineConfig)
SetServiceInvokerManager(serviceInvokerManager invoker.ServiceInvokerManager) {
+ c.serviceInvokerManager = serviceInvokerManager
+}
+
+func (c *DefaultStateMachineConfig)
SetScriptInvokerManager(scriptInvokerManager invoker.ScriptInvokerManager) {
+ c.scriptInvokerManager = scriptInvokerManager
+}
+
+func (c *DefaultStateMachineConfig)
SetStatusDecisionStrategy(statusDecisionStrategy StatusDecisionStrategy) {
+ c.statusDecisionStrategy = statusDecisionStrategy
+}
+
+func (c *DefaultStateMachineConfig) SetSeqGenerator(seqGenerator
sequence.SeqGenerator) {
+ c.seqGenerator = seqGenerator
+}
+
+func (c *DefaultStateMachineConfig) StateLogRepository() StateLogRepository {
+ return c.stateLogRepository
+}
+
+func (c *DefaultStateMachineConfig) StateMachineRepository()
StateMachineRepository {
+ return c.stateMachineRepository
+}
+
+func (c *DefaultStateMachineConfig) StateLogStore() StateLogStore {
+ return c.stateLogStore
+}
+
+func (c *DefaultStateMachineConfig) StateLangStore() StateLangStore {
+ return c.stateLangStore
+}
+
+func (c *DefaultStateMachineConfig) ExpressionFactoryManager()
expr.ExpressionFactoryManager {
+ return c.expressionFactoryManager
+}
+
+func (c *DefaultStateMachineConfig) ExpressionResolver()
expr.ExpressionResolver {
+ return c.expressionResolver
+}
+
+func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator {
+ return c.seqGenerator
+}
+
+func (c *DefaultStateMachineConfig) StatusDecisionStrategy()
StatusDecisionStrategy {
+ return c.statusDecisionStrategy
+}
+
+func (c *DefaultStateMachineConfig) EventPublisher() EventPublisher {
+ return c.syncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig) AsyncEventPublisher() EventPublisher {
+ return c.asyncProcessCtrlEventPublisher
+}
+
+func (c *DefaultStateMachineConfig) ServiceInvokerManager()
invoker.ServiceInvokerManager {
+ return c.serviceInvokerManager
+}
+
+func (c *DefaultStateMachineConfig) ScriptInvokerManager()
invoker.ScriptInvokerManager {
+ return c.scriptInvokerManager
+}
+
+func (c *DefaultStateMachineConfig) CharSet() string {
+ return c.charset
+}
+
+func (c *DefaultStateMachineConfig) SetCharSet(charset string) {
+ c.charset = charset
+}
+
+func (c *DefaultStateMachineConfig) DefaultTenantId() string {
+ return c.defaultTenantId
+}
+
+func (c *DefaultStateMachineConfig) TransOperationTimeout() int {
+ return c.transOperationTimeout
+}
+
+func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int {
+ return c.serviceInvokeTimeout
+}
+
+func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
+ c := &DefaultStateMachineConfig{
+ transOperationTimeout: DefaultTransOperTimeout,
+ serviceInvokeTimeout: DefaultServiceInvokeTimeout,
+ charset: "UTF-8",
+ defaultTenantId: "000001",
+ componentLock: &sync.Mutex{},
+ }
+
+ // TODO: init config
+ return c
+}
diff --git a/pkg/saga/statemachine/engine/core/engine_utils.go
b/pkg/saga/statemachine/engine/core/engine_utils.go
new file mode 100644
index 00000000..e1dcd4de
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/engine_utils.go
@@ -0,0 +1,149 @@
+package core
+
+import (
+ "context"
+ "errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+ "github.com/seata/seata-go/pkg/util/log"
+ "golang.org/x/sync/semaphore"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+)
+
+func EndStateMachine(ctx context.Context, processContext ProcessContext) error
{
+ if processContext.HasVariable(constant.VarNameIsLoopState) {
+ if processContext.HasVariable(constant.LoopSemaphore) {
+ weighted, ok :=
processContext.GetVariable(constant.LoopSemaphore).(semaphore.Weighted)
+ if !ok {
+ return errors.New("semaphore type is not
weighted")
+ }
+ weighted.Release(1)
+ }
+ }
+
+ stateMachineInstance, ok :=
processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
+ if !ok {
+ return errors.New("state machine instance type is not
statelang.StateMachineInstance")
+ }
+
+ stateMachineInstance.SetEndTime(time.Now())
+
+ exp, ok :=
processContext.GetVariable(constant.VarNameCurrentException).(error)
+ if !ok {
+ return errors.New("exception type is not error")
+ }
+
+ if exp != nil {
+ stateMachineInstance.SetException(exp)
+ log.Debugf("Exception Occurred: %s", exp)
+ }
+
+ stateMachineConfig, ok :=
processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+
+ if err :=
stateMachineConfig.StatusDecisionStrategy().DecideOnEndState(ctx,
processContext, stateMachineInstance, exp); err != nil {
+ return err
+ }
+
+ contextParams, ok :=
processContext.GetVariable(constant.VarNameStateMachineContext).(map[string]interface{})
+ if !ok {
+ return errors.New("state machine context type is not
map[string]interface{}")
+ }
+ endParams := stateMachineInstance.EndParams()
+ for k, v := range contextParams {
+ endParams[k] = v
+ }
+ stateMachineInstance.SetEndParams(endParams)
+
+ stateInstruction, ok :=
processContext.GetInstruction().(StateInstruction)
+ if !ok {
+ return errors.New("state instruction type is not
process_ctrl.StateInstruction")
+ }
+ stateInstruction.SetEnd(true)
+
+ stateMachineInstance.SetRunning(false)
+ stateMachineInstance.SetEndTime(time.Now())
+
+ if stateMachineInstance.StateMachine().IsPersist() &&
stateMachineConfig.StateLangStore() != nil {
+ err :=
stateMachineConfig.StateLogStore().RecordStateMachineFinished(ctx,
stateMachineInstance, processContext)
+ if err != nil {
+ return err
+ }
+ }
+
+ callBack, ok :=
processContext.GetVariable(constant.VarNameAsyncCallback).(CallBack)
+ if ok {
+ if exp != nil {
+ callBack.OnError(ctx, processContext,
stateMachineInstance, exp)
+ } else {
+ callBack.OnFinished(ctx, processContext,
stateMachineInstance)
+ }
+ }
+
+ return nil
+}
+
+func HandleException(processContext ProcessContext, abstractTaskState
*state.AbstractTaskState, err error) {
+ catches := abstractTaskState.Catches()
+ if catches != nil && len(catches) != 0 {
+ for _, exceptionMatch := range catches {
+ exceptions := exceptionMatch.Exceptions()
+ exceptionTypes := exceptionMatch.ExceptionTypes()
+ if exceptions != nil && len(exceptions) != 0 {
+ if exceptionTypes == nil {
+ lock :=
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+ lock.Lock()
+ defer lock.Unlock()
+ error := errors.New("")
+ for i := 0; i < len(exceptions); i++ {
+ exceptionTypes =
append(exceptionTypes, reflect.TypeOf(error))
+ }
+ }
+
+ exceptionMatch.SetExceptionTypes(exceptionTypes)
+ }
+
+ for i, _ := range exceptionTypes {
+ if reflect.TypeOf(err) == exceptionTypes[i] {
+ // HACK: we can not get error type in
config file during runtime, so we use exception str
+ if strings.Contains(err.Error(),
exceptions[i]) {
+ hierarchicalProcessContext :=
processContext.(HierarchicalProcessContext)
+
hierarchicalProcessContext.SetVariable(constant.VarNameCurrentExceptionRoute,
exceptionMatch.Next())
+ return
+ }
+ }
+ }
+ }
+ }
+
+ log.Error("Task execution failed and no catches configured")
+ hierarchicalProcessContext :=
processContext.(HierarchicalProcessContext)
+
hierarchicalProcessContext.SetVariable(constant.VarNameIsExceptionNotCatch,
true)
+}
+
+// GetOriginStateName get origin state name without suffix like fork
+func GetOriginStateName(stateInstance statelang.StateInstance) string {
+ stateName := stateInstance.Name()
+ if stateName != "" {
+ end := strings.LastIndex(stateName,
constant.LoopStateNamePattern)
+ if end > -1 {
+ return stateName[:end+1]
+ }
+ }
+ return stateName
+}
+
+// IsTimeout test if is timeout
+func IsTimeout(gmtUpdated time.Time, timeoutMillis int) bool {
+ if timeoutMillis < 0 {
+ return false
+ }
+ return time.Now().Unix()-gmtUpdated.Unix() > int64(timeoutMillis)
+}
+
+func GenerateParentId(stateInstance statelang.StateInstance) string {
+ return stateInstance.MachineInstanceID() + constant.SeperatorParentId +
stateInstance.ID()
+}
diff --git a/pkg/saga/statemachine/engine/events/event.go
b/pkg/saga/statemachine/engine/core/event.go
similarity index 63%
rename from pkg/saga/statemachine/engine/events/event.go
rename to pkg/saga/statemachine/engine/core/event.go
index 8f142713..4f7c18b2 100644
--- a/pkg/saga/statemachine/engine/events/event.go
+++ b/pkg/saga/statemachine/engine/core/event.go
@@ -1,4 +1,4 @@
-package events
+package core
type Event interface {
}
diff --git a/pkg/saga/statemachine/engine/core/event_bus.go
b/pkg/saga/statemachine/engine/core/event_bus.go
new file mode 100644
index 00000000..47d617d4
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/event_bus.go
@@ -0,0 +1,113 @@
+package core
+
+import (
+ "context"
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/util/collection"
+ "github.com/seata/seata-go/pkg/util/log"
+)
+
+type EventBus interface {
+ Offer(ctx context.Context, event Event) (bool, error)
+
+ EventConsumerList(event Event) []EventConsumer
+
+ RegisterEventConsumer(consumer EventConsumer)
+}
+
+type BaseEventBus struct {
+ eventConsumerList []EventConsumer
+}
+
+func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
+ if b.eventConsumerList == nil {
+ b.eventConsumerList = make([]EventConsumer, 0)
+ }
+ b.eventConsumerList = append(b.eventConsumerList, consumer)
+}
+
+func (b *BaseEventBus) EventConsumerList(event Event) []EventConsumer {
+ var acceptedConsumerList = make([]EventConsumer, 0)
+ for i := range b.eventConsumerList {
+ eventConsumer := b.eventConsumerList[i]
+ if eventConsumer.Accept(event) {
+ acceptedConsumerList = append(acceptedConsumerList,
eventConsumer)
+ }
+ }
+ return acceptedConsumerList
+}
+
+type DirectEventBus struct {
+ BaseEventBus
+}
+
+func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
+ eventConsumerList := d.EventConsumerList(event)
+ if len(eventConsumerList) == 0 {
+ log.Debugf("cannot find event handler by type: %T", event)
+ return false, nil
+ }
+
+ isFirstEvent := true
+ processContext, ok := event.(ProcessContext)
+ if !ok {
+ log.Errorf("event %T is illegal, required
process_ctrl.ProcessContext", event)
+ return false, nil
+ }
+
+ stack :=
processContext.GetVariable(constant.VarNameSyncExeStack).(*collection.Stack)
+ if stack == nil {
+ stack = collection.NewStack()
+ processContext.SetVariable(constant.VarNameSyncExeStack, stack)
+ isFirstEvent = true
+ }
+
+ stack.Push(processContext)
+ if isFirstEvent {
+ for stack.Len() > 0 {
+ currentContext := stack.Pop().(ProcessContext)
+ for _, eventConsumer := range eventConsumerList {
+ err := eventConsumer.Process(ctx,
currentContext)
+ if err != nil {
+ log.Errorf("process event %T error:
%s", event, err.Error())
+ return false, err
+ }
+ }
+ }
+ }
+
+ return true, nil
+}
+
+type AsyncEventBus struct {
+ BaseEventBus
+}
+
+func (a AsyncEventBus) Offer(ctx context.Context, event Event) (bool, error) {
+ eventConsumerList := a.EventConsumerList(event)
+ if len(eventConsumerList) == 0 {
+ errStr := fmt.Sprintf("cannot find event handler by type: %T",
event)
+ log.Errorf(errStr)
+ return false, errors.New(errStr)
+ }
+
+ processContext, ok := event.(ProcessContext)
+ if !ok {
+ errStr := fmt.Sprintf("event %T is illegal, required
process_ctrl.ProcessContext", event)
+ log.Errorf(errStr)
+ return false, errors.New(errStr)
+ }
+
+ for _, eventConsumer := range eventConsumerList {
+ go func() {
+ err := eventConsumer.Process(ctx, processContext)
+ if err != nil {
+ log.Errorf("process event %T error: %s", event,
err.Error())
+ }
+ }()
+ }
+
+ return true, nil
+}
diff --git a/pkg/saga/statemachine/engine/events/event_consumer.go
b/pkg/saga/statemachine/engine/core/event_consumer.go
similarity index 53%
rename from pkg/saga/statemachine/engine/events/event_consumer.go
rename to pkg/saga/statemachine/engine/core/event_consumer.go
index 426da15e..1df21bdb 100644
--- a/pkg/saga/statemachine/engine/events/event_consumer.go
+++ b/pkg/saga/statemachine/engine/core/event_consumer.go
@@ -1,8 +1,9 @@
-package events
+package core
import (
"context"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+ "fmt"
+ "github.com/pkg/errors"
)
type EventConsumer interface {
@@ -12,6 +13,7 @@ type EventConsumer interface {
}
type ProcessCtrlEventConsumer struct {
+ processController ProcessController
}
func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
@@ -19,11 +21,14 @@ func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
return false
}
- _, ok := event.(process_ctrl.ProcessContext)
+ _, ok := event.(ProcessContext)
return ok
}
func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event)
error {
- //TODO implement me
- panic("implement me")
+ processContext, ok := event.(ProcessContext)
+ if !ok {
+ return errors.New(fmt.Sprint("event %T is illegal, required
process_ctrl.ProcessContext", event))
+ }
+ return p.processController.Process(ctx, processContext)
}
diff --git a/pkg/saga/statemachine/engine/events/event_publisher.go
b/pkg/saga/statemachine/engine/core/event_publisher.go
similarity index 96%
rename from pkg/saga/statemachine/engine/events/event_publisher.go
rename to pkg/saga/statemachine/engine/core/event_publisher.go
index 1fbfaec2..9ad86940 100644
--- a/pkg/saga/statemachine/engine/events/event_publisher.go
+++ b/pkg/saga/statemachine/engine/core/event_publisher.go
@@ -1,4 +1,4 @@
-package events
+package core
import "context"
diff --git a/pkg/saga/statemachine/engine/core/instruction.go
b/pkg/saga/statemachine/engine/core/instruction.go
new file mode 100644
index 00000000..ed81fc9c
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/instruction.go
@@ -0,0 +1,96 @@
+package core
+
+import (
+ "errors"
+ "fmt"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type Instruction interface {
+}
+
+type StateInstruction struct {
+ stateName string
+ stateMachineName string
+ tenantId string
+ end bool
+ temporaryState statelang.State
+}
+
+func NewStateInstruction(stateMachineName string, tenantId string)
*StateInstruction {
+ return &StateInstruction{stateMachineName: stateMachineName, tenantId:
tenantId}
+}
+
+func (s *StateInstruction) StateName() string {
+ return s.stateName
+}
+
+func (s *StateInstruction) SetStateName(stateName string) {
+ s.stateName = stateName
+}
+
+func (s *StateInstruction) StateMachineName() string {
+ return s.stateMachineName
+}
+
+func (s *StateInstruction) SetStateMachineName(stateMachineName string) {
+ s.stateMachineName = stateMachineName
+}
+
+func (s *StateInstruction) TenantId() string {
+ return s.tenantId
+}
+
+func (s *StateInstruction) SetTenantId(tenantId string) {
+ s.tenantId = tenantId
+}
+
+func (s *StateInstruction) End() bool {
+ return s.end
+}
+
+func (s *StateInstruction) SetEnd(end bool) {
+ s.end = end
+}
+
+func (s *StateInstruction) TemporaryState() statelang.State {
+ return s.temporaryState
+}
+
+func (s *StateInstruction) SetTemporaryState(temporaryState statelang.State) {
+ s.temporaryState = temporaryState
+}
+
+func (s *StateInstruction) GetState(context ProcessContext) (statelang.State,
error) {
+ if s.temporaryState != nil {
+ return s.temporaryState, nil
+ }
+
+ if s.stateMachineName == "" {
+ return nil, errors.New("stateMachineName is required")
+ }
+
+ stateMachineConfig, ok :=
context.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+ if !ok {
+ return nil, errors.New("stateMachineConfig is required in
context")
+ }
+ stateMachine, err :=
stateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(s.stateMachineName,
s.tenantId)
+ if err != nil {
+ return nil, errors.New("get stateMachine in state machine
repository error")
+ }
+ if stateMachine == nil {
+ return nil, errors.New(fmt.Sprintf("stateMachine [%s] is not
exist", s.stateMachineName))
+ }
+
+ if s.stateName == "" {
+ s.stateName = stateMachine.StartState()
+ }
+
+ state := stateMachine.States()[s.stateName]
+ if state == nil {
+ return nil, errors.New(fmt.Sprintf("state [%s] is not exist",
s.stateName))
+ }
+
+ return state, nil
+}
diff --git a/pkg/saga/statemachine/engine/core/loop_context_holder.go
b/pkg/saga/statemachine/engine/core/loop_context_holder.go
new file mode 100644
index 00000000..b1f69fe8
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/loop_context_holder.go
@@ -0,0 +1,92 @@
+package core
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "sync"
+)
+
+type LoopContextHolder struct {
+ nrOfInstances int32
+ nrOfActiveInstances int32
+ nrOfCompletedInstances int32
+ failEnd bool
+ completionConditionSatisfied bool
+ loopCounterStack []int
+ forwardCounterStack []int
+ collection interface{}
+}
+
+func NewLoopContextHolder() *LoopContextHolder {
+ return &LoopContextHolder{
+ nrOfInstances: 0,
+ nrOfActiveInstances: 0,
+ nrOfCompletedInstances: 0,
+ failEnd: false,
+ completionConditionSatisfied: false,
+ loopCounterStack: make([]int, 0),
+ forwardCounterStack: make([]int, 0),
+ collection: nil,
+ }
+}
+
+func GetCurrentLoopContextHolder(ctx context.Context, processContext
ProcessContext, forceCreate bool) *LoopContextHolder {
+ mutex :=
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+ mutex.Lock()
+ defer mutex.Unlock()
+
+ loopContextHolder :=
processContext.GetVariable(constant.VarNameCurrentLoopContextHolder).(*LoopContextHolder)
+ if loopContextHolder == nil && forceCreate {
+ loopContextHolder = &LoopContextHolder{}
+
processContext.SetVariable(constant.VarNameCurrentLoopContextHolder,
loopContextHolder)
+ }
+ return loopContextHolder
+}
+
+func ClearCurrent(ctx context.Context, processContext ProcessContext) {
+ processContext.RemoveVariable(constant.VarNameCurrentLoopContextHolder)
+}
+
+func (l *LoopContextHolder) NrOfInstances() int32 {
+ return l.nrOfInstances
+}
+
+func (l *LoopContextHolder) NrOfActiveInstances() int32 {
+ return l.nrOfActiveInstances
+}
+
+func (l *LoopContextHolder) NrOfCompletedInstances() int32 {
+ return l.nrOfCompletedInstances
+}
+
+func (l *LoopContextHolder) FailEnd() bool {
+ return l.failEnd
+}
+
+func (l *LoopContextHolder) SetFailEnd(failEnd bool) {
+ l.failEnd = failEnd
+}
+
+func (l *LoopContextHolder) CompletionConditionSatisfied() bool {
+ return l.completionConditionSatisfied
+}
+
+func (l *LoopContextHolder)
SetCompletionConditionSatisfied(completionConditionSatisfied bool) {
+ l.completionConditionSatisfied = completionConditionSatisfied
+}
+
+func (l *LoopContextHolder) LoopCounterStack() []int {
+ return l.loopCounterStack
+}
+
+func (l *LoopContextHolder) ForwardCounterStack() []int {
+ return l.forwardCounterStack
+}
+
+func (l *LoopContextHolder) Collection() interface{} {
+ return l.collection
+}
+
+func (l *LoopContextHolder) SetCollection(collection interface{}) {
+ l.collection = collection
+}
diff --git a/pkg/saga/statemachine/engine/core/loop_task_utils.go
b/pkg/saga/statemachine/engine/core/loop_task_utils.go
new file mode 100644
index 00000000..d56ac7f0
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/loop_task_utils.go
@@ -0,0 +1,40 @@
+package core
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+ "github.com/seata/seata-go/pkg/util/log"
+)
+
+func GetLoopConfig(ctx context.Context, processContext ProcessContext,
currentState statelang.State) state.Loop {
+ if matchLoop(currentState) {
+ taskState := currentState.(state.AbstractTaskState)
+ stateMachineInstance :=
processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
+ stateMachineConfig :=
processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+
+ if taskState.Loop() != nil {
+ loop := taskState.Loop()
+ collectionName := loop.Collection()
+ if collectionName != "" {
+ expression :=
CreateValueExpression(stateMachineConfig.ExpressionResolver(), collectionName)
+ collection := GetValue(expression,
stateMachineInstance.Context(), nil)
+ collectionList := collection.([]any)
+ if len(collectionList) > 0 {
+ current :=
GetCurrentLoopContextHolder(ctx, processContext, true)
+ current.SetCollection(collection)
+ return loop
+ }
+ }
+ log.Warn("State [{}] loop collection param [{}]
invalid", currentState.Name(), collectionName)
+ }
+
+ }
+ return nil
+}
+
+func matchLoop(currentState statelang.State) bool {
+ return currentState != nil && (constant.StateTypeServiceTask ==
currentState.Type() ||
+ constant.StateTypeScriptTask == currentState.Type() ||
constant.StateTypeSubStateMachine == currentState.Type())
+}
diff --git a/pkg/saga/statemachine/engine/core/parameter_utils.go
b/pkg/saga/statemachine/engine/core/parameter_utils.go
new file mode 100644
index 00000000..680167cf
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/parameter_utils.go
@@ -0,0 +1,132 @@
+package core
+
+import (
+ "fmt"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+ "strings"
+ "sync"
+)
+
+func CreateInputParams(processContext ProcessContext, expressionResolver
expr.ExpressionResolver,
+ stateInstance *statelang.StateInstanceImpl, serviceTaskState
*state.AbstractTaskState, variablesFrom any) []any {
+ inputAssignments := serviceTaskState.Input()
+ if inputAssignments == nil || len(inputAssignments) == 0 {
+ return inputAssignments
+ }
+
+ inputExpressions := serviceTaskState.InputExpressions()
+ if inputExpressions == nil || len(inputExpressions) == 0 {
+ lock :=
processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
+ lock.Lock()
+ defer lock.Unlock()
+ inputExpressions = serviceTaskState.InputExpressions()
+ if inputExpressions == nil || len(inputExpressions) == 0 {
+ inputExpressions = make([]any, 0, len(inputAssignments))
+
+ for _, assignment := range inputAssignments {
+ inputExpressions = append(inputExpressions,
CreateValueExpression(expressionResolver, assignment))
+ }
+ }
+ serviceTaskState.SetInputExpressions(inputExpressions)
+ }
+ inputValues := make([]any, 0, len(inputExpressions))
+ for _, valueExpression := range inputExpressions {
+ value := GetValue(valueExpression, variablesFrom, stateInstance)
+ inputValues = append(inputValues, value)
+ }
+
+ return inputValues
+}
+
+func CreateOutputParams(config StateMachineConfig, expressionResolver
expr.ExpressionResolver,
+ serviceTaskState *state.AbstractTaskState, variablesFrom any)
(map[string]any, error) {
+ outputAssignments := serviceTaskState.Output()
+ if outputAssignments == nil || len(outputAssignments) == 0 {
+ return make(map[string]any, 0), nil
+ }
+
+ outputExpressions := serviceTaskState.OutputExpressions()
+ if outputExpressions == nil {
+ config.ComponentLock().Lock()
+ defer config.ComponentLock().Unlock()
+ outputExpressions = serviceTaskState.OutputExpressions()
+ if outputExpressions == nil {
+ outputExpressions = make(map[string]any,
len(outputAssignments))
+ for key, value := range outputAssignments {
+ outputExpressions[key] =
CreateValueExpression(expressionResolver, value)
+ }
+ }
+ serviceTaskState.SetOutputExpressions(outputExpressions)
+ }
+ outputValues := make(map[string]any, len(outputExpressions))
+ for paramName, _ := range outputExpressions {
+ outputValues[paramName] =
GetValue(outputExpressions[paramName], variablesFrom, nil)
+ }
+ return outputValues, nil
+}
+
+func CreateValueExpression(expressionResolver expr.ExpressionResolver,
paramAssignment any) any {
+ var valueExpression any
+
+ switch paramAssignment.(type) {
+ case expr.Expression:
+ valueExpression = paramAssignment
+ case map[string]any:
+ paramMapAssignment := paramAssignment.(map[string]any)
+ paramMap := make(map[string]any, len(paramMapAssignment))
+ for key, value := range paramMapAssignment {
+ paramMap[key] =
CreateValueExpression(expressionResolver, value)
+ }
+ valueExpression = paramMap
+ case []any:
+ paramListAssignment := paramAssignment.([]any)
+ paramList := make([]any, 0, len(paramListAssignment))
+ for _, value := range paramListAssignment {
+ paramList = append(paramList,
CreateValueExpression(expressionResolver, value))
+ }
+ valueExpression = paramList
+ case string:
+ value := paramAssignment.(string)
+ if !strings.HasPrefix(value, "$") {
+ valueExpression = paramAssignment
+ }
+ valueExpression = expressionResolver.Expression(value)
+ default:
+ valueExpression = paramAssignment
+ }
+ return valueExpression
+}
+
+func GetValue(valueExpression any, variablesFrom any, stateInstance
statelang.StateInstance) any {
+ switch valueExpression.(type) {
+ case expr.Expression:
+ expression := valueExpression.(expr.Expression)
+ value := expression.Value(variablesFrom)
+ if _, ok := valueExpression.(expr.SequenceExpression); value !=
nil && stateInstance != nil && stateInstance.BusinessKey() == "" && ok {
+ stateInstance.SetBusinessKey(fmt.Sprintf("%v", value))
+ }
+ return value
+ case map[string]any:
+ mapValueExpression := valueExpression.(map[string]any)
+ mapValue := make(map[string]any, len(mapValueExpression))
+ for key, value := range mapValueExpression {
+ value = GetValue(value, variablesFrom, stateInstance)
+ if value != nil {
+ mapValue[key] = value
+ }
+ }
+ return mapValue
+ case []any:
+ valueExpressionList := valueExpression.([]any)
+ listValue := make([]any, 0, len(valueExpression.([]any)))
+ for i, _ := range valueExpressionList {
+ listValue = append(listValue,
GetValue(valueExpressionList[i], variablesFrom, stateInstance))
+ }
+ return listValue
+ default:
+ return valueExpression
+ }
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_context.go
b/pkg/saga/statemachine/engine/core/process_context.go
similarity index 99%
rename from pkg/saga/statemachine/engine/process_ctrl/process_context.go
rename to pkg/saga/statemachine/engine/core/process_context.go
index 7d02e470..9da5c3e8 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/process_context.go
+++ b/pkg/saga/statemachine/engine/core/process_context.go
@@ -1,4 +1,4 @@
-package process_ctrl
+package core
import (
"sync"
diff --git a/pkg/saga/statemachine/engine/core/process_controller.go
b/pkg/saga/statemachine/engine/core/process_controller.go
new file mode 100644
index 00000000..2f45aebd
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/process_controller.go
@@ -0,0 +1,31 @@
+package core
+
+import (
+ "context"
+)
+
+type ProcessController interface {
+ Process(ctx context.Context, context ProcessContext) error
+}
+
+type ProcessControllerImpl struct {
+ businessProcessor BusinessProcessor
+}
+
+func (p *ProcessControllerImpl) Process(ctx context.Context, context
ProcessContext) error {
+ if err := p.businessProcessor.Process(ctx, context); err != nil {
+ return err
+ }
+ if err := p.businessProcessor.Route(ctx, context); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (p *ProcessControllerImpl) BusinessProcessor() BusinessProcessor {
+ return p.businessProcessor
+}
+
+func (p *ProcessControllerImpl) SetBusinessProcessor(businessProcessor
BusinessProcessor) {
+ p.businessProcessor = businessProcessor
+}
diff --git
a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
new file mode 100644
index 00000000..426ec65d
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
@@ -0,0 +1,424 @@
+package core
+
+import (
+ "context"
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
+ "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+ seataErrors "github.com/seata/seata-go/pkg/util/errors"
+ "github.com/seata/seata-go/pkg/util/log"
+ "time"
+)
+
+type ProcessCtrlStateMachineEngine struct {
+ StateMachineConfig StateMachineConfig
+}
+
+func NewProcessCtrlStateMachineEngine() *ProcessCtrlStateMachineEngine {
+ return &ProcessCtrlStateMachineEngine{
+ StateMachineConfig: NewDefaultStateMachineConfig(),
+ }
+}
+
+func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context,
stateMachineName string, tenantId string, startParams map[string]interface{})
(statelang.StateMachineInstance, error) {
+ return p.startInternal(ctx, stateMachineName, tenantId, "",
startParams, false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) Compensate(ctx context.Context,
stateMachineInstId string,
+ replaceParams map[string]any) (statelang.StateMachineInstance, error) {
+ return p.compensateInternal(ctx, stateMachineInstId, replaceParams,
false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context,
stateMachineName string, tenantId string,
+ businessKey string, startParams map[string]interface{}, async bool,
callback CallBack) (statelang.StateMachineInstance, error) {
+ if tenantId == "" {
+ tenantId = p.StateMachineConfig.DefaultTenantId()
+ }
+
+ stateMachineInstance, err := p.createMachineInstance(stateMachineName,
tenantId, businessKey, startParams)
+ if err != nil {
+ return nil, err
+ }
+
+ // Build the process_ctrl context.
+ processContextBuilder := NewProcessContextBuilder().
+ WithProcessType(process.StateLang).
+ WithOperationName(constant.OperationNameStart).
+ WithAsyncCallback(callback).
+ WithInstruction(NewStateInstruction(stateMachineName,
tenantId)).
+ WithStateMachineInstance(stateMachineInstance).
+ WithStateMachineConfig(p.StateMachineConfig).
+ WithStateMachineEngine(p).
+ WithIsAsyncExecution(async)
+
+ contextMap := p.copyMap(startParams)
+
+ stateMachineInstance.SetContext(contextMap)
+
+ processContext :=
processContextBuilder.WithStateMachineContextVariables(contextMap).Build()
+
+ if stateMachineInstance.StateMachine().IsPersist() &&
p.StateMachineConfig.StateLogStore() != nil {
+ err :=
p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx,
stateMachineInstance, processContext)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if stateMachineInstance.ID() == "" {
+
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst,
""))
+ }
+
+ var eventPublisher EventPublisher
+ if async {
+ eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
+ } else {
+ eventPublisher = p.StateMachineConfig.EventPublisher()
+ }
+
+ _, err = eventPublisher.PushEvent(ctx, processContext)
+ if err != nil {
+ return nil, err
+ }
+
+ return stateMachineInstance, nil
+}
+
+// copyMap not deep copy, so best practice: Don’t pass by reference
+func (p ProcessCtrlStateMachineEngine) copyMap(startParams
map[string]interface{}) map[string]interface{} {
+ copyMap := make(map[string]interface{}, len(startParams))
+ for k, v := range startParams {
+ copyMap[k] = v
+ }
+ return copyMap
+}
+
+func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName
string, tenantId string, businessKey string, startParams
map[string]interface{}) (statelang.StateMachineInstance, error) {
+ stateMachine, err :=
p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName,
tenantId)
+ if err != nil {
+ return nil, err
+ }
+
+ if stateMachine == nil {
+ return nil, errors.New("StateMachine [" + stateMachineName + "]
is not exists")
+ }
+
+ stateMachineInstance := statelang.NewStateMachineInstanceImpl()
+ stateMachineInstance.SetStateMachine(stateMachine)
+ stateMachineInstance.SetTenantID(tenantId)
+ stateMachineInstance.SetBusinessKey(businessKey)
+ stateMachineInstance.SetStartParams(startParams)
+ if startParams != nil {
+ if businessKey != "" {
+ startParams[constant.VarNameBusinesskey] = businessKey
+ }
+
+ if startParams[constant.VarNameParentId] != nil {
+ parentId, ok :=
startParams[constant.VarNameParentId].(string)
+ if !ok {
+
+ }
+ stateMachineInstance.SetParentID(parentId)
+ delete(startParams, constant.VarNameParentId)
+ }
+ }
+
+ stateMachineInstance.SetStatus(statelang.RU)
+ stateMachineInstance.SetRunning(true)
+
+ now := time.Now()
+ stateMachineInstance.SetStartedTime(now)
+ stateMachineInstance.SetUpdatedTime(now)
+ return stateMachineInstance, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) compensateInternal(ctx context.Context,
stateMachineInstId string, replaceParams map[string]any,
+ async bool, callback CallBack) (statelang.StateMachineInstance, error) {
+ stateMachineInstance, err := p.reloadStateMachineInstance(ctx,
stateMachineInstId)
+ if err != nil {
+ return nil, err
+ }
+
+ if stateMachineInstance == nil {
+ return nil,
exception.NewEngineExecutionException(seataErrors.StateMachineInstanceNotExists,
+ "StateMachineInstance is not exits", nil)
+ }
+
+ if statelang.SU == stateMachineInstance.CompensationStatus() {
+ return stateMachineInstance, nil
+ }
+
+ if stateMachineInstance.CompensationStatus() != "" {
+ denyStatus := make([]statelang.ExecutionStatus, 0)
+ denyStatus = append(denyStatus, statelang.SU)
+ p.checkStatus(ctx, stateMachineInstance, nil, denyStatus, "",
stateMachineInstance.CompensationStatus(),
+ "compensate")
+ }
+
+ if replaceParams != nil {
+ for key, value := range replaceParams {
+ stateMachineInstance.EndParams()[key] = value
+ }
+ }
+
+ contextBuilder :=
NewProcessContextBuilder().WithProcessType(process.StateLang).
+
WithOperationName(constant.OperationNameCompensate).WithAsyncCallback(callback).
+ WithStateMachineInstance(stateMachineInstance).
+
WithStateMachineConfig(p.StateMachineConfig).WithStateMachineEngine(p).WithIsAsyncExecution(async)
+
+ context := contextBuilder.Build()
+
+ contextVariables, err := p.getStateMachineContextVariables(ctx,
stateMachineInstance)
+
+ if replaceParams != nil {
+ for key, value := range replaceParams {
+ contextVariables[key] = value
+ }
+ }
+
+ p.putBusinesskeyToContextariables(stateMachineInstance,
contextVariables)
+
+ // TODO: Here is not use sync.map, make sure whether to use it
+ concurrentContextVariables := make(map[string]any)
+ p.nullSafeCopy(contextVariables, concurrentContextVariables)
+
+ context.SetVariable(constant.VarNameStateMachineContext,
concurrentContextVariables)
+ stateMachineInstance.SetContext(concurrentContextVariables)
+
+ tempCompensationTriggerState := state.NewCompensationTriggerStateImpl()
+
tempCompensationTriggerState.SetStateMachine(stateMachineInstance.StateMachine())
+
+ stateMachineInstance.SetRunning(true)
+
+ log.Info("Operation [compensate] start. stateMachineInstance[id:" +
stateMachineInstance.ID() + "]")
+
+ if stateMachineInstance.StateMachine().IsPersist() {
+ err :=
p.StateMachineConfig.StateLogStore().RecordStateMachineRestarted(ctx,
stateMachineInstance, context)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ inst := NewStateInstruction(stateMachineInstance.TenantID(),
stateMachineInstance.StateMachine().Name())
+ inst.SetTemporaryState(tempCompensationTriggerState)
+ context.SetInstruction(inst)
+
+ if async {
+ _, err :=
p.StateMachineConfig.AsyncEventPublisher().PushEvent(ctx, context)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ _, err := p.StateMachineConfig.EventPublisher().PushEvent(ctx,
context)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return stateMachineInstance, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) reloadStateMachineInstance(ctx
context.Context, instId string) (statelang.StateMachineInstance, error) {
+ instance, err :=
p.StateMachineConfig.StateLogStore().GetStateMachineInstance(instId)
+ if err != nil {
+ return nil, err
+ }
+ if instance != nil {
+ stateMachine := instance.StateMachine()
+ if stateMachine == nil {
+ stateMachine, err =
p.StateMachineConfig.StateMachineRepository().GetStateMachineById(instance.MachineID())
+ if err != nil {
+ return nil, err
+ }
+ instance.SetStateMachine(stateMachine)
+ }
+ if stateMachine == nil {
+ return nil,
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+ "StateMachine[id:"+instance.MachineID()+"] not
exist.", nil)
+ }
+
+ stateList := instance.StateList()
+ if stateList == nil || len(stateList) == 0 {
+ stateList, err =
p.StateMachineConfig.StateLogStore().GetStateInstanceListByMachineInstanceId(instId)
+ if err != nil {
+ return nil, err
+ }
+ if stateList != nil && len(stateList) > 0 {
+ for _, tmpStateInstance := range stateList {
+
instance.PutState(tmpStateInstance.ID(), tmpStateInstance)
+ }
+ }
+ }
+
+ if instance.EndParams() == nil || len(instance.EndParams()) ==
0 {
+ variables, err := p.replayContextVariables(ctx,
instance)
+ if err != nil {
+ return nil, err
+ }
+ instance.SetEndParams(variables)
+ }
+ }
+ return instance, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) replayContextVariables(ctx
context.Context, stateMachineInstance statelang.StateMachineInstance)
(map[string]any, error) {
+ contextVariables := make(map[string]any)
+ if stateMachineInstance.StartParams() != nil {
+ for key, value := range stateMachineInstance.StartParams() {
+ contextVariables[key] = value
+ }
+ }
+
+ stateInstanceList := stateMachineInstance.StateList()
+ if stateInstanceList == nil || len(stateInstanceList) == 0 {
+ return contextVariables, nil
+ }
+
+ for _, stateInstance := range stateInstanceList {
+ serviceOutputParams := stateInstance.OutputParams()
+ if serviceOutputParams != nil {
+ serviceTaskStateImpl, ok :=
stateMachineInstance.StateMachine().State(GetOriginStateName(stateInstance)).(*state.ServiceTaskStateImpl)
+ if !ok {
+ return nil,
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+ "Cannot find State by state name
["+stateInstance.Name()+"], may be this is a bug", nil)
+ }
+
+ if serviceTaskStateImpl.Output() != nil &&
len(serviceTaskStateImpl.Output()) != 0 {
+ outputVariablesToContext, err :=
CreateOutputParams(p.StateMachineConfig,
+
p.StateMachineConfig.ExpressionResolver(),
serviceTaskStateImpl.AbstractTaskState, serviceOutputParams)
+ if err != nil {
+ return nil,
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+ "Context variable replay
failed", err)
+ }
+ if outputVariablesToContext != nil &&
len(outputVariablesToContext) != 0 {
+ for key, value := range
outputVariablesToContext {
+ contextVariables[key] = value
+ }
+ }
+ if len(stateInstance.BusinessKey()) > 0 {
+
contextVariables[serviceTaskStateImpl.Name()+constant.VarNameBusinesskey] =
stateInstance.BusinessKey()
+ }
+ }
+ }
+ }
+
+ return contextVariables, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) checkStatus(ctx context.Context,
stateMachineInstance statelang.StateMachineInstance,
+ acceptStatus []statelang.ExecutionStatus, denyStatus
[]statelang.ExecutionStatus, status statelang.ExecutionStatus,
+ compenStatus statelang.ExecutionStatus, operation string) (bool, error)
{
+ if status != "" && compenStatus != "" {
+ return false,
exception.NewEngineExecutionException(seataErrors.InvalidParameter,
+ "status and compensationStatus are not supported at the
same time", nil)
+ }
+ if status == "" && compenStatus == "" {
+ return false,
exception.NewEngineExecutionException(seataErrors.InvalidParameter,
+ "status and compensationStatus must input at least
one", nil)
+ }
+ if statelang.SU == compenStatus {
+ message := p.buildExceptionMessage(stateMachineInstance, nil,
nil, "", statelang.SU, operation)
+ return false,
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+ message, nil)
+ }
+
+ if stateMachineInstance.IsRunning() &&
+ !IsTimeout(stateMachineInstance.UpdatedTime(),
p.StateMachineConfig.TransOperationTimeout()) {
+ return false,
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+ "StateMachineInstance
[id:"+stateMachineInstance.ID()+"] is running, operation["+operation+
+ "] denied", nil)
+ }
+
+ if (denyStatus == nil || len(denyStatus) == 0) && (acceptStatus == nil
|| len(acceptStatus) == 0) {
+ return false,
exception.NewEngineExecutionException(seataErrors.InvalidParameter,
+ "StateMachineInstance[id:"+stateMachineInstance.ID()+
+ "], acceptable status and deny status must
input at least one", nil)
+ }
+
+ currentStatus := compenStatus
+ if status != "" {
+ currentStatus = status
+ }
+
+ if denyStatus != nil && len(denyStatus) == 0 {
+ for _, tempDenyStatus := range denyStatus {
+ if tempDenyStatus == currentStatus {
+ message :=
p.buildExceptionMessage(stateMachineInstance, acceptStatus, denyStatus, status,
+ compenStatus, operation)
+ return false,
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+ message, nil)
+ }
+ }
+ }
+
+ if acceptStatus == nil || len(acceptStatus) == 0 {
+ return true, nil
+ } else {
+ for _, tempStatus := range acceptStatus {
+ if tempStatus == currentStatus {
+ return true, nil
+ }
+ }
+ }
+
+ message := p.buildExceptionMessage(stateMachineInstance, acceptStatus,
denyStatus, status, compenStatus,
+ operation)
+ return false,
exception.NewEngineExecutionException(seataErrors.OperationDenied,
+ message, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) getStateMachineContextVariables(ctx
context.Context,
+ stateMachineInstance statelang.StateMachineInstance) (map[string]any,
error) {
+ contextVariables := stateMachineInstance.EndParams()
+ if contextVariables == nil || len(contextVariables) == 0 {
+ return p.replayContextVariables(ctx, stateMachineInstance)
+ }
+ return contextVariables, nil
+}
+
+func (p ProcessCtrlStateMachineEngine) buildExceptionMessage(instance
statelang.StateMachineInstance,
+ acceptStatus []statelang.ExecutionStatus, denyStatus
[]statelang.ExecutionStatus, status statelang.ExecutionStatus,
+ compenStatus statelang.ExecutionStatus, operation string) string {
+ message := fmt.Sprintf("StateMachineInstance[id:%s]", instance.ID())
+ if len(acceptStatus) > 0 {
+ message += ",acceptable status :"
+ for _, tempStatus := range acceptStatus {
+ message += string(tempStatus) + " "
+ }
+ }
+
+ if len(denyStatus) > 0 {
+ message += ",deny status:"
+ for _, tempStatus := range denyStatus {
+ message += string(tempStatus) + " "
+ }
+ }
+
+ if status != "" {
+ message += ",current status:" + string(status)
+ }
+
+ if compenStatus != "" {
+ message += ",current compensation status:" +
string(compenStatus)
+ }
+
+ message += fmt.Sprintf(",so operation [%s] denied", operation)
+ return message
+}
+
+func (p ProcessCtrlStateMachineEngine)
putBusinesskeyToContextariables(instance statelang.StateMachineInstance,
variables map[string]any) {
+ if instance.BusinessKey() != "" &&
variables[constant.VarNameBusinesskey] == "" {
+ variables[constant.VarNameBusinesskey] = instance.BusinessKey()
+ }
+}
+
+func (p ProcessCtrlStateMachineEngine) nullSafeCopy(srcMap map[string]any,
destMap map[string]any) {
+ for key, value := range srcMap {
+ if value == nil {
+ destMap[key] = value
+ }
+ }
+}
diff --git a/pkg/saga/statemachine/engine/core/process_router.go
b/pkg/saga/statemachine/engine/core/process_router.go
new file mode 100644
index 00000000..5d7e5998
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/process_router.go
@@ -0,0 +1,194 @@
+package core
+
+import (
+ "context"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/util/log"
+)
+
+type RouterHandler interface {
+ Route(ctx context.Context, processContext ProcessContext) error
+}
+
+type ProcessRouter interface {
+ Route(ctx context.Context, processContext ProcessContext) error
+}
+
+type InterceptAbleStateRouter interface {
+ StateRouter
+ StateRouterInterceptor() []StateRouterInterceptor
+ RegistryStateRouterInterceptor(stateRouterInterceptor
StateRouterInterceptor)
+}
+
+type StateRouter interface {
+ Route(ctx context.Context, processContext ProcessContext, state
statelang.State) (Instruction, error)
+}
+
+type StateRouterInterceptor interface {
+ PreRoute(ctx context.Context, processContext ProcessContext, state
statelang.State) error
+ PostRoute(ctx context.Context, processContext ProcessContext,
instruction Instruction, err error) error
+ Match(stateType string) bool
+}
+
+type DefaultRouterHandler struct {
+ eventPublisher EventPublisher
+ processRouters map[string]ProcessRouter
+}
+
+func (d *DefaultRouterHandler) Route(ctx context.Context, processContext
ProcessContext) error {
+ processType := d.matchProcessType(ctx, processContext)
+ if processType == "" {
+ log.Warnf("Process type not found, context= %s", processContext)
+ return errors.New("Process type not found")
+ }
+
+ processRouter := d.processRouters[string(processType)]
+ if processRouter == nil {
+ log.Errorf("Cannot find process router by type %s, context =
%s", processType, processContext)
+ return errors.New("Process router not found")
+ }
+
+ instruction := processRouter.Route(ctx, processContext)
+ if instruction == nil {
+ log.Info("route instruction is null, process end")
+ } else {
+ processContext.SetInstruction(instruction)
+ _, err := d.eventPublisher.PushEvent(ctx, processContext)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (d *DefaultRouterHandler) matchProcessType(ctx context.Context,
processContext ProcessContext) process.ProcessType {
+ processType, ok :=
processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType)
+ if !ok || processType == "" {
+ processType = process.StateLang
+ }
+ return processType
+}
+
+func (d *DefaultRouterHandler) EventPublisher() EventPublisher {
+ return d.eventPublisher
+}
+
+func (d *DefaultRouterHandler) SetEventPublisher(eventPublisher
EventPublisher) {
+ d.eventPublisher = eventPublisher
+}
+
+func (d *DefaultRouterHandler) ProcessRouters() map[string]ProcessRouter {
+ return d.processRouters
+}
+
+func (d *DefaultRouterHandler) SetProcessRouters(processRouters
map[string]ProcessRouter) {
+ d.processRouters = processRouters
+}
+
+type StateMachineProcessRouter struct {
+ stateRouters map[string]StateRouter
+}
+
+func (s *StateMachineProcessRouter) Route(ctx context.Context, processContext
ProcessContext) (Instruction, error) {
+ stateInstruction, ok :=
processContext.GetInstruction().(StateInstruction)
+ if !ok {
+ return nil, errors.New("instruction is not a state instruction")
+ }
+
+ var state statelang.State
+ if stateInstruction.TemporaryState() != nil {
+ state = stateInstruction.TemporaryState()
+ stateInstruction.SetTemporaryState(nil)
+ } else {
+ stateMachineConfig, ok :=
processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
+ if !ok {
+ return nil, errors.New("state machine config not found")
+ }
+
+ stateMachine, err :=
stateMachineConfig.StateMachineRepository().GetStateMachineByNameAndTenantId(stateInstruction.StateMachineName(),
+ stateInstruction.TenantId())
+ if err != nil {
+ return nil, err
+ }
+
+ state = stateMachine.States()[stateInstruction.StateName()]
+ }
+
+ stateType := state.Type()
+ router := s.stateRouters[stateType]
+
+ var interceptors []StateRouterInterceptor
+ if interceptAbleStateRouter, ok := router.(InterceptAbleStateRouter);
ok {
+ interceptors = interceptAbleStateRouter.StateRouterInterceptor()
+ }
+
+ var executedInterceptors []StateRouterInterceptor
+ var exception error
+ instruction, exception := func() (Instruction, error) {
+ if interceptors == nil || len(executedInterceptors) == 0 {
+ executedInterceptors = make([]StateRouterInterceptor,
0, len(interceptors))
+ for _, interceptor := range interceptors {
+ executedInterceptors =
append(executedInterceptors, interceptor)
+ err := interceptor.PreRoute(ctx,
processContext, state)
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ instruction, err := router.Route(ctx, processContext, state)
+ if err != nil {
+ return nil, err
+ }
+ return instruction, nil
+ }()
+
+ if interceptors == nil || len(executedInterceptors) == 0 {
+ for i := len(executedInterceptors) - 1; i >= 0; i-- {
+ err := executedInterceptors[i].PostRoute(ctx,
processContext, instruction, exception)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // if 'Succeed' or 'Fail' State did not configured, we must end
the state machine
+ if instruction == nil && !stateInstruction.End() {
+ err := EndStateMachine(ctx, processContext)
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ return instruction, nil
+}
+
+func (s *StateMachineProcessRouter) InitDefaultStateRouters() {
+ if s.stateRouters == nil || len(s.stateRouters) == 0 {
+ s.stateRouters = make(map[string]StateRouter)
+ taskStateRouter := &TaskStateRouter{}
+ s.stateRouters[constant.StateTypeServiceTask] = taskStateRouter
+ s.stateRouters[constant.StateTypeScriptTask] = taskStateRouter
+ s.stateRouters[constant.StateTypeChoice] = taskStateRouter
+ s.stateRouters[constant.StateTypeCompensationTrigger] =
taskStateRouter
+ s.stateRouters[constant.StateTypeSubStateMachine] =
taskStateRouter
+ s.stateRouters[constant.StateTypeCompensateSubMachine] =
taskStateRouter
+ s.stateRouters[constant.StateTypeLoopStart] = taskStateRouter
+
+ endStateRouter := &EndStateRouter{}
+ s.stateRouters[constant.StateTypeSucceed] = endStateRouter
+ s.stateRouters[constant.StateTypeFail] = endStateRouter
+ }
+}
+
+func (s *StateMachineProcessRouter) StateRouters() map[string]StateRouter {
+ return s.stateRouters
+}
+
+func (s *StateMachineProcessRouter) SetStateRouters(stateRouters
map[string]StateRouter) {
+ s.stateRouters = stateRouters
+}
diff --git
a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
b/pkg/saga/statemachine/engine/core/process_state.go
similarity index 69%
rename from pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
rename to pkg/saga/statemachine/engine/core/process_state.go
index 66338fd8..3dc06920 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
+++ b/pkg/saga/statemachine/engine/core/process_state.go
@@ -1,8 +1,8 @@
-package process_ctrl
+package core
import (
"context"
- "github.com/pkg/errors"
+ "errors"
"sync"
)
@@ -11,20 +11,20 @@ type StateHandler interface {
ProcessHandler
}
-type StateRouter interface {
- State() string
- RouterHandler
-}
-
type InterceptAbleStateHandler interface {
StateHandler
StateHandlerInterceptorList() []StateHandlerInterceptor
RegistryStateHandlerInterceptor(stateHandlerInterceptor
StateHandlerInterceptor)
}
+type ProcessHandler interface {
+ Process(ctx context.Context, processContext ProcessContext) error
+}
+
type StateHandlerInterceptor interface {
PreProcess(ctx context.Context, processContext ProcessContext) error
PostProcess(ctx context.Context, processContext ProcessContext) error
+ Match(stateType string) bool
}
type StateMachineProcessHandler struct {
@@ -99,40 +99,3 @@ func (s *StateMachineProcessHandler)
RegistryStateHandler(stateType string, stat
}
s.mp[stateType] = stateHandler
}
-
-type StateMachineRouterHandler struct {
- mu sync.RWMutex
- mp map[string]StateRouter
-}
-
-func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext
ProcessContext) error {
- stateInstruction, _ :=
processContext.GetInstruction().(StateInstruction)
-
- state, err := stateInstruction.GetState(processContext)
- if err != nil {
- return err
- }
-
- stateType := state.Type()
- stateRouter := s.GetStateRouter(stateType)
- if stateRouter == nil {
- return errors.New("Not support [" + stateType + "] state
router")
- }
-
- return stateRouter.Route(ctx, processContext)
-}
-
-func (s *StateMachineRouterHandler) GetStateRouter(stateType string)
StateRouter {
- s.mu.RLock()
- defer s.mu.RUnlock()
- return s.mp[stateType]
-}
-
-func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string,
stateRouter StateRouter) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.mp == nil {
- s.mp = make(map[string]StateRouter)
- }
- s.mp[stateType] = stateRouter
-}
diff --git a/pkg/saga/statemachine/engine/core/state_router.go
b/pkg/saga/statemachine/engine/core/state_router.go
new file mode 100644
index 00000000..185984e4
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/state_router.go
@@ -0,0 +1,151 @@
+package core
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ sagaState
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+ seataErrors "github.com/seata/seata-go/pkg/util/errors"
+ "github.com/seata/seata-go/pkg/util/log"
+)
+
+type EndStateRouter struct {
+}
+
+func (e EndStateRouter) Route(ctx context.Context, processContext
ProcessContext, state statelang.State) (Instruction, error) {
+ return nil, nil
+}
+
+type TaskStateRouter struct {
+}
+
+func (t TaskStateRouter) Route(ctx context.Context, processContext
ProcessContext, state statelang.State) (Instruction, error) {
+ stateInstruction, _ :=
processContext.GetInstruction().(StateInstruction)
+ if stateInstruction.End() {
+ log.Infof("StateInstruction is ended, Stop the StateMachine
executing. StateMachine[%s] Current State[%s]",
+ stateInstruction.StateMachineName(),
stateInstruction.StateName())
+ }
+
+ // check if in loop async condition
+ isLoop, ok :=
processContext.GetVariable(constant.VarNameIsLoopState).(bool)
+ if ok && isLoop {
+ log.Infof("StateMachine[%s] Current State[%s] is in loop async
condition, skip route processing.",
+ stateInstruction.StateMachineName(),
stateInstruction.StateName())
+ return nil, nil
+ }
+
+ // The current CompensationTriggerState can mark the compensation
process is started and perform compensation
+ // route processing.
+ compensationTriggerState, ok :=
processContext.GetVariable(constant.VarNameCurrentCompensateTriggerState).(statelang.State)
+ if ok {
+ return t.compensateRoute(ctx, processContext,
compensationTriggerState)
+ }
+
+ // There is an exception route, indicating that an exception is thrown,
and the exception route is prioritized.
+ next :=
processContext.GetVariable(constant.VarNameCurrentExceptionRoute).(string)
+
+ if next != "" {
+
processContext.RemoveVariable(constant.VarNameCurrentExceptionRoute)
+ } else {
+ next = state.Next()
+ }
+
+ // If next is empty, the state selected by the Choice state was taken.
+ if next == "" &&
processContext.HasVariable(constant.VarNameCurrentChoice) {
+ next =
processContext.GetVariable(constant.VarNameCurrentChoice).(string)
+ processContext.RemoveVariable(constant.VarNameCurrentChoice)
+ }
+
+ if next == "" {
+ return nil, nil
+ }
+
+ stateMachine := state.StateMachine()
+ nextState := stateMachine.State(next)
+ if nextState == nil {
+ return nil,
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+ "Next state["+next+"] is not exits", nil)
+ }
+
+ stateInstruction.SetStateName(next)
+
+ if nil != GetLoopConfig(ctx, processContext, nextState) {
+
stateInstruction.SetTemporaryState(sagaState.NewLoopStartStateImpl())
+ }
+
+ return stateInstruction, nil
+}
+
+func (t *TaskStateRouter) compensateRoute(ctx context.Context, processContext
ProcessContext,
+ compensationTriggerState statelang.State) (Instruction, error) {
+ //If there is already a compensation state that has been executed,
+ // it is judged whether it is wrong or unsuccessful,
+ // and the compensation process is interrupted.
+ isFirstCompensationStateStart :=
processContext.GetVariable(constant.VarNameFirstCompensationStateStarted).(bool)
+ if isFirstCompensationStateStart {
+ exception :=
processContext.GetVariable(constant.VarNameCurrentException).(error)
+ if exception != nil {
+ return nil, EndStateMachine(ctx, processContext)
+ }
+
+ stateInstance :=
processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)
+ if stateInstance != nil && statelang.SU !=
stateInstance.Status() {
+ return nil, EndStateMachine(ctx, processContext)
+ }
+ }
+
+ stateStackToBeCompensated := GetCurrentCompensationHolder(ctx,
processContext, true).StateStackNeedCompensation()
+ if stateStackToBeCompensated != nil {
+ stateToBeCompensated :=
stateStackToBeCompensated.Pop().(statelang.StateInstance)
+
+ stateMachine :=
processContext.GetVariable(constant.VarNameStateMachine).(statelang.StateMachine)
+ state :=
stateMachine.State(GetOriginStateName(stateToBeCompensated))
+ if taskState, ok := state.(sagaState.AbstractTaskState); ok {
+ instruction :=
processContext.GetInstruction().(StateInstruction)
+
+ var compensateState statelang.State
+ compensateStateName := taskState.CompensateState()
+ if len(compensateStateName) != 0 {
+ compensateState =
stateMachine.State(compensateStateName)
+ }
+
+ if subStateMachine, ok :=
state.(sagaState.SubStateMachine); compensateState == nil && ok {
+ compensateState =
subStateMachine.CompensateStateImpl()
+ instruction.SetTemporaryState(compensateState)
+ }
+
+ if compensateState == nil {
+ return nil, EndStateMachine(ctx, processContext)
+ }
+
+ instruction.SetStateName(compensateState.Name())
+
+ GetCurrentCompensationHolder(ctx, processContext,
true).AddToBeCompensatedState(compensateState.Name(),
+ stateToBeCompensated)
+
+ hierarchicalProcessContext :=
processContext.(HierarchicalProcessContext)
+
hierarchicalProcessContext.SetVariableLocally(constant.VarNameFirstCompensationStateStarted,
true)
+
+ if _, ok :=
compensateState.(sagaState.CompensateSubStateMachineState); ok {
+ hierarchicalProcessContext =
processContext.(HierarchicalProcessContext)
+ hierarchicalProcessContext.SetVariableLocally(
+
compensateState.Name()+constant.VarNameSubMachineParentId,
+ GenerateParentId(stateToBeCompensated))
+ }
+
+ return instruction, nil
+ }
+ }
+
+
processContext.RemoveVariable(constant.VarNameCurrentCompensateTriggerState)
+
+ compensationTriggerStateNext := compensationTriggerState.Next()
+ if compensationTriggerStateNext == "" {
+ return nil, EndStateMachine(ctx, processContext)
+ }
+
+ instruction := processContext.GetInstruction().(StateInstruction)
+ instruction.SetStateName(compensationTriggerStateNext)
+ return instruction, nil
+}
diff --git a/pkg/saga/statemachine/engine/statemachine_config.go
b/pkg/saga/statemachine/engine/core/statemachine_config.go
similarity index 52%
rename from pkg/saga/statemachine/engine/statemachine_config.go
rename to pkg/saga/statemachine/engine/core/statemachine_config.go
index 3f3c530b..9c77b73a 100644
--- a/pkg/saga/statemachine/engine/statemachine_config.go
+++ b/pkg/saga/statemachine/engine/core/statemachine_config.go
@@ -1,22 +1,20 @@
-package engine
+package core
import (
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
+ "sync"
)
type StateMachineConfig interface {
- StateLogRepository() store.StateLogRepository
+ StateLogRepository() StateLogRepository
- StateMachineRepository() store.StateMachineRepository
+ StateMachineRepository() StateMachineRepository
- StateLogStore() store.StateLogStore
+ StateLogStore() StateLogStore
- StateLangStore() store.StateLangStore
+ StateLangStore() StateLangStore
ExpressionFactoryManager() expr.ExpressionFactoryManager
@@ -24,11 +22,11 @@ type StateMachineConfig interface {
SeqGenerator() sequence.SeqGenerator
- StatusDecisionStrategy() status_decision.StatusDecisionStrategy
+ StatusDecisionStrategy() StatusDecisionStrategy
- EventPublisher() events.EventPublisher
+ EventPublisher() EventPublisher
- AsyncEventPublisher() events.EventPublisher
+ AsyncEventPublisher() EventPublisher
ServiceInvokerManager() invoker.ServiceInvokerManager
@@ -41,4 +39,6 @@ type StateMachineConfig interface {
TransOperationTimeout() int
ServiceInvokeTimeout() int
+
+ ComponentLock() *sync.Mutex
}
diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine.go
b/pkg/saga/statemachine/engine/core/statemachine_engine.go
new file mode 100644
index 00000000..c18d91c1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/statemachine_engine.go
@@ -0,0 +1,16 @@
+package core
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type StateMachineEngine interface {
+ Start(ctx context.Context, stateMachineName string, tenantId string,
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
+ Compensate(ctx context.Context, stateMachineInstId string,
replaceParams map[string]any) (statelang.StateMachineInstance, error)
+}
+
+type CallBack interface {
+ OnFinished(ctx context.Context, context ProcessContext,
stateMachineInstance statelang.StateMachineInstance)
+ OnError(ctx context.Context, context ProcessContext,
stateMachineInstance statelang.StateMachineInstance, err error)
+}
diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine_test.go
b/pkg/saga/statemachine/engine/core/statemachine_engine_test.go
new file mode 100644
index 00000000..e87eb9ca
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/statemachine_engine_test.go
@@ -0,0 +1,15 @@
+package core
+
+import (
+ "context"
+ "testing"
+)
+
+func TestEngine(t *testing.T) {
+
+}
+
+func TestSimpleStateMachine(t *testing.T) {
+ engine := NewProcessCtrlStateMachineEngine()
+ engine.Start(context.Background(), "simpleStateMachine", "tenantId",
nil)
+}
diff --git a/pkg/saga/statemachine/engine/store/statemachine_store.go
b/pkg/saga/statemachine/engine/core/statemachine_store.go
similarity index 83%
rename from pkg/saga/statemachine/engine/store/statemachine_store.go
rename to pkg/saga/statemachine/engine/core/statemachine_store.go
index 2dac06d3..7a763335 100644
--- a/pkg/saga/statemachine/engine/store/statemachine_store.go
+++ b/pkg/saga/statemachine/engine/core/statemachine_store.go
@@ -1,8 +1,7 @@
-package store
+package core
import (
"context"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"io"
)
@@ -20,15 +19,15 @@ type StateLogRepository interface {
}
type StateLogStore interface {
- RecordStateMachineStarted(ctx context.Context, machineInstance
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+ RecordStateMachineStarted(ctx context.Context, machineInstance
statelang.StateMachineInstance, context ProcessContext) error
- RecordStateMachineFinished(ctx context.Context, machineInstance
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+ RecordStateMachineFinished(ctx context.Context, machineInstance
statelang.StateMachineInstance, context ProcessContext) error
- RecordStateMachineRestarted(ctx context.Context, machineInstance
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+ RecordStateMachineRestarted(ctx context.Context, machineInstance
statelang.StateMachineInstance, context ProcessContext) error
- RecordStateStarted(ctx context.Context, stateInstance
statelang.StateInstance, context process_ctrl.ProcessContext) error
+ RecordStateStarted(ctx context.Context, stateInstance
statelang.StateInstance, context ProcessContext) error
- RecordStateFinished(ctx context.Context, stateInstance
statelang.StateInstance, context process_ctrl.ProcessContext) error
+ RecordStateFinished(ctx context.Context, stateInstance
statelang.StateInstance, context ProcessContext) error
GetStateMachineInstance(stateMachineInstanceId string)
(statelang.StateMachineInstance, error)
@@ -44,6 +43,8 @@ type StateLogStore interface {
type StateMachineRepository interface {
GetStateMachineById(stateMachineId string) (statelang.StateMachine,
error)
+ GetStateMachineByNameAndTenantId(stateMachineName string, tenantId
string) (statelang.StateMachine, error)
+
GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error)
RegistryStateMachine(statelang.StateMachine) error
diff --git a/pkg/saga/statemachine/engine/core/status_decision.go
b/pkg/saga/statemachine/engine/core/status_decision.go
new file mode 100644
index 00000000..e95a6316
--- /dev/null
+++ b/pkg/saga/statemachine/engine/core/status_decision.go
@@ -0,0 +1,188 @@
+package core
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/util/log"
+)
+
+type StatusDecisionStrategy interface {
+ // DecideOnEndState Determine state machine execution status when
executing to EndState
+ DecideOnEndState(ctx context.Context, processContext ProcessContext,
+ stateMachineInstance statelang.StateMachineInstance, exp error)
error
+ // DecideOnTaskStateFail Determine state machine execution status when
executing TaskState error
+ DecideOnTaskStateFail(ctx context.Context, processContext
ProcessContext,
+ stateMachineInstance statelang.StateMachineInstance, exp error)
error
+ // DecideMachineForwardExecutionStatus Determine the forward execution
state of the state machine
+ DecideMachineForwardExecutionStatus(ctx context.Context,
+ stateMachineInstance statelang.StateMachineInstance, exp error,
specialPolicy bool) error
+}
+
+type DefaultStatusDecisionStrategy struct {
+}
+
+func NewDefaultStatusDecisionStrategy() *DefaultStatusDecisionStrategy {
+ return &DefaultStatusDecisionStrategy{}
+}
+
+func (d DefaultStatusDecisionStrategy) DecideOnEndState(ctx context.Context,
processContext ProcessContext,
+ stateMachineInstance statelang.StateMachineInstance, exp error) error {
+ if statelang.RU == stateMachineInstance.CompensationStatus() {
+ compensationHolder := GetCurrentCompensationHolder(ctx,
processContext, true)
+ if err := decideMachineCompensateStatus(ctx,
stateMachineInstance, compensationHolder); err != nil {
+ return err
+ }
+ } else {
+ failEndStateFlag, ok :=
processContext.GetVariable(constant.VarNameFailEndStateFlag).(bool)
+ if !ok {
+ failEndStateFlag = false
+ }
+ if _, err := decideMachineForwardExecutionStatus(ctx,
stateMachineInstance, exp, failEndStateFlag); err != nil {
+ return err
+ }
+ }
+
+ if stateMachineInstance.CompensationStatus() != "" &&
constant.OperationNameForward ==
+
processContext.GetVariable(constant.VarNameOperationName).(string) &&
statelang.SU == stateMachineInstance.Status() {
+ stateMachineInstance.SetCompensationStatus(statelang.FA)
+ }
+
+ log.Debugf("StateMachine Instance[id:%s,name:%s] execute finish with
status[%s], compensation status [%s].",
+ stateMachineInstance.ID(),
stateMachineInstance.StateMachine().Name(),
+ stateMachineInstance.Status(),
stateMachineInstance.CompensationStatus())
+
+ return nil
+}
+
+func decideMachineCompensateStatus(ctx context.Context, stateMachineInstance
statelang.StateMachineInstance, compensationHolder *CompensationHolder) error {
+ if stateMachineInstance.Status() == "" || statelang.RU ==
stateMachineInstance.Status() {
+ stateMachineInstance.SetStatus(statelang.UN)
+ }
+ if !compensationHolder.StateStackNeedCompensation().Empty() {
+ hasCompensateSUorUN := false
+ compensationHolder.StatesForCompensation().Range(
+ func(key, value any) bool {
+ stateInstance, ok :=
value.(statelang.StateInstance)
+ if !ok {
+ return false
+ }
+ if statelang.UN == stateInstance.Status() ||
statelang.SU == stateInstance.Status() {
+ hasCompensateSUorUN = true
+ return true
+ }
+ return false
+ })
+
+ if hasCompensateSUorUN {
+ stateMachineInstance.SetCompensationStatus(statelang.UN)
+ } else {
+ stateMachineInstance.SetCompensationStatus(statelang.FA)
+ }
+ } else {
+ hasCompensateError := false
+ compensationHolder.StatesForCompensation().Range(
+ func(key, value any) bool {
+ stateInstance, ok :=
value.(statelang.StateInstance)
+ if !ok {
+ return false
+ }
+ if statelang.SU != stateInstance.Status() {
+ hasCompensateError = true
+ return true
+ }
+ return false
+ })
+
+ if hasCompensateError {
+ stateMachineInstance.SetCompensationStatus(statelang.UN)
+ } else {
+ stateMachineInstance.SetCompensationStatus(statelang.SU)
+ }
+ }
+ return nil
+}
+
+func decideMachineForwardExecutionStatus(ctx context.Context,
stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy
bool) (bool, error) {
+ result := false
+
+ if stateMachineInstance.Status() == "" || statelang.RU ==
stateMachineInstance.Status() {
+ result = true
+ stateList := stateMachineInstance.StateList()
+
setMachineStatusBasedOnStateListAndException(stateMachineInstance, stateList,
exp)
+
+ if specialPolicy && statelang.SU ==
stateMachineInstance.Status() {
+ for _, stateInstance := range
stateMachineInstance.StateList() {
+ if !stateInstance.IsIgnoreStatus() &&
(stateInstance.IsForUpdate() || stateInstance.IsForCompensation()) {
+
stateMachineInstance.SetStatus(statelang.UN)
+ break
+ }
+ }
+ if statelang.SU == stateMachineInstance.Status() {
+ stateMachineInstance.SetStatus(statelang.FA)
+ }
+ }
+ }
+ return result, nil
+}
+
+func setMachineStatusBasedOnStateListAndException(stateMachineInstance
statelang.StateMachineInstance,
+ stateList []statelang.StateInstance, exp error) {
+ hasSetStatus := false
+ hasSuccessUpdateService := false
+ if stateList != nil && len(stateList) > 0 {
+ hasUnsuccessService := false
+
+ for i := len(stateList) - 1; i >= 0; i-- {
+ stateInstance := stateList[i]
+
+ if stateInstance.IsIgnoreStatus() ||
stateInstance.IsForCompensation() {
+ continue
+ }
+ if statelang.UN == stateInstance.Status() {
+ stateMachineInstance.SetStatus(statelang.UN)
+ hasSetStatus = true
+ } else if statelang.SU == stateInstance.Status() {
+ if constant.StateTypeServiceTask ==
stateInstance.Type() {
+ if stateInstance.IsForUpdate() &&
!stateInstance.IsForCompensation() {
+ hasSuccessUpdateService = true
+ }
+ }
+ } else if statelang.SK == stateInstance.Status() {
+ // ignore
+ } else {
+ hasUnsuccessService = true
+ }
+ }
+
+ if !hasSetStatus && hasUnsuccessService {
+ if hasSuccessUpdateService {
+ stateMachineInstance.SetStatus(statelang.UN)
+ } else {
+ stateMachineInstance.SetStatus(statelang.FA)
+ }
+ hasSetStatus = true
+ }
+ }
+
+ if !hasSetStatus {
+ setMachineStatusBasedOnException(stateMachineInstance, exp,
hasSuccessUpdateService)
+ }
+}
+
+func setMachineStatusBasedOnException(stateMachineInstance
statelang.StateMachineInstance, exp error, hasSuccessUpdateService bool) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (d DefaultStatusDecisionStrategy) DecideOnTaskStateFail(ctx
context.Context, processContext ProcessContext,
+ stateMachineInstance statelang.StateMachineInstance, exp error) error {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (d DefaultStatusDecisionStrategy) DecideMachineForwardExecutionStatus(ctx
context.Context,
+ stateMachineInstance statelang.StateMachineInstance, exp error,
specialPolicy bool) error {
+ //TODO implement me
+ panic("implement me")
+}
diff --git a/pkg/saga/statemachine/engine/utils.go
b/pkg/saga/statemachine/engine/core/utils.go
similarity index 84%
rename from pkg/saga/statemachine/engine/utils.go
rename to pkg/saga/statemachine/engine/core/utils.go
index 92b7aa10..8300c0eb 100644
--- a/pkg/saga/statemachine/engine/utils.go
+++ b/pkg/saga/statemachine/engine/core/utils.go
@@ -1,22 +1,22 @@
-package engine
+package core
import (
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+ "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)
// ProcessContextBuilder process_ctrl builder
type ProcessContextBuilder struct {
- processContext process_ctrl.ProcessContext
+ processContext ProcessContext
}
func NewProcessContextBuilder() *ProcessContextBuilder {
- processContextImpl := process_ctrl.NewProcessContextImpl()
+ processContextImpl := NewProcessContextImpl()
return &ProcessContextBuilder{processContextImpl}
}
-func (p *ProcessContextBuilder) WithProcessType(processType
process_ctrl.ProcessType) *ProcessContextBuilder {
+func (p *ProcessContextBuilder) WithProcessType(processType
process.ProcessType) *ProcessContextBuilder {
p.processContext.SetVariable(constant.VarNameProcessType, processType)
return p
}
@@ -34,7 +34,7 @@ func (p *ProcessContextBuilder) WithAsyncCallback(callBack
CallBack) *ProcessCon
return p
}
-func (p *ProcessContextBuilder) WithInstruction(instruction
process_ctrl.Instruction) *ProcessContextBuilder {
+func (p *ProcessContextBuilder) WithInstruction(instruction Instruction)
*ProcessContextBuilder {
if instruction != nil {
p.processContext.SetInstruction(instruction)
}
@@ -81,6 +81,6 @@ func (p *ProcessContextBuilder) WithIsAsyncExecution(async
bool) *ProcessContext
return p
}
-func (p *ProcessContextBuilder) Build() process_ctrl.ProcessContext {
+func (p *ProcessContextBuilder) Build() ProcessContext {
return p.processContext
}
diff --git a/pkg/saga/statemachine/engine/default_statemachine_config.go
b/pkg/saga/statemachine/engine/default_statemachine_config.go
deleted file mode 100644
index 2d7b1d8e..00000000
--- a/pkg/saga/statemachine/engine/default_statemachine_config.go
+++ /dev/null
@@ -1,123 +0,0 @@
-package engine
-
-import (
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
-)
-
-const (
- DefaultTransOperTimeout = 60000 * 30
- DefaultServiceInvokeTimeout = 60000 * 5
-)
-
-type DefaultStateMachineConfig struct {
- // Configuration
- transOperationTimeout int
- serviceInvokeTimeout int
- charset string
- defaultTenantId string
-
- // Components
-
- // Store related components
- stateLogRepository store.StateLogRepository
- stateLogStore store.StateLogStore
- stateLangStore store.StateLangStore
- stateMachineRepository store.StateMachineRepository
-
- // Expression related components
- expressionFactoryManager expr.ExpressionFactoryManager
- expressionResolver expr.ExpressionResolver
-
- // Invoker related components
- serviceInvokerManager invoker.ServiceInvokerManager
- scriptInvokerManager invoker.ScriptInvokerManager
-
- // Other components
- statusDecisionStrategy status_decision.StatusDecisionStrategy
- seqGenerator sequence.SeqGenerator
-}
-
-func (c *DefaultStateMachineConfig) StateLogRepository()
store.StateLogRepository {
- return c.stateLogRepository
-}
-
-func (c *DefaultStateMachineConfig) StateMachineRepository()
store.StateMachineRepository {
- return c.stateMachineRepository
-}
-
-func (c *DefaultStateMachineConfig) StateLogStore() store.StateLogStore {
- return c.stateLogStore
-}
-
-func (c *DefaultStateMachineConfig) StateLangStore() store.StateLangStore {
- return c.stateLangStore
-}
-
-func (c *DefaultStateMachineConfig) ExpressionFactoryManager()
expr.ExpressionFactoryManager {
- return c.expressionFactoryManager
-}
-
-func (c *DefaultStateMachineConfig) ExpressionResolver()
expr.ExpressionResolver {
- return c.expressionResolver
-}
-
-func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator {
- return c.seqGenerator
-}
-
-func (c *DefaultStateMachineConfig) StatusDecisionStrategy()
status_decision.StatusDecisionStrategy {
- return c.statusDecisionStrategy
-}
-
-func (c *DefaultStateMachineConfig) EventPublisher() events.EventPublisher {
- //TODO implement me
- panic("implement me")
-}
-
-func (c *DefaultStateMachineConfig) AsyncEventPublisher()
events.EventPublisher {
- //TODO implement me
- panic("implement me")
-}
-
-func (c *DefaultStateMachineConfig) ServiceInvokerManager()
invoker.ServiceInvokerManager {
- return c.serviceInvokerManager
-}
-
-func (c *DefaultStateMachineConfig) ScriptInvokerManager()
invoker.ScriptInvokerManager {
- return c.scriptInvokerManager
-}
-
-func (c *DefaultStateMachineConfig) CharSet() string {
- return c.charset
-}
-
-func (c *DefaultStateMachineConfig) SetCharSet(charset string) {
- c.charset = charset
-}
-
-func (c *DefaultStateMachineConfig) DefaultTenantId() string {
- return c.defaultTenantId
-}
-
-func (c *DefaultStateMachineConfig) TransOperationTimeout() int {
- return c.transOperationTimeout
-}
-
-func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int {
- return c.serviceInvokeTimeout
-}
-
-func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
- c := &DefaultStateMachineConfig{
- transOperationTimeout: DefaultTransOperTimeout,
- serviceInvokeTimeout: DefaultServiceInvokeTimeout,
- charset: "UTF-8",
- defaultTenantId: "000001",
- }
- return c
-}
diff --git a/pkg/saga/statemachine/engine/events/event_bus.go
b/pkg/saga/statemachine/engine/events/event_bus.go
deleted file mode 100644
index 77a1e616..00000000
--- a/pkg/saga/statemachine/engine/events/event_bus.go
+++ /dev/null
@@ -1,49 +0,0 @@
-package events
-
-import (
- "context"
-)
-
-type EventBus interface {
- Offer(ctx context.Context, event Event) (bool, error)
-
- RegisterEventConsumer(consumer EventConsumer)
-}
-
-type BaseEventBus struct {
- eventConsumerList []EventConsumer
-}
-
-func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
- if b.eventConsumerList == nil {
- b.eventConsumerList = make([]EventConsumer, 0)
- }
- b.eventConsumerList = append(b.eventConsumerList, consumer)
-}
-
-func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer {
- var acceptedConsumerList = make([]EventConsumer, 0)
- for i := range b.eventConsumerList {
- eventConsumer := b.eventConsumerList[i]
- if eventConsumer.Accept(event) {
- acceptedConsumerList = append(acceptedConsumerList,
eventConsumer)
- }
- }
- return acceptedConsumerList
-}
-
-type DirectEventBus struct {
- BaseEventBus
-}
-
-func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
- eventConsumerList := d.GetEventConsumerList(event)
- if len(eventConsumerList) == 0 {
- //TODO logger
- return false, nil
- }
-
- //processContext, _ := event.(process_ctrl.ProcessContext)
-
- return true, nil
-}
diff --git a/pkg/saga/statemachine/engine/exception/exception.go
b/pkg/saga/statemachine/engine/exception/exception.go
new file mode 100644
index 00000000..f5c58239
--- /dev/null
+++ b/pkg/saga/statemachine/engine/exception/exception.go
@@ -0,0 +1,54 @@
+package exception
+
+import "github.com/seata/seata-go/pkg/util/errors"
+
+type EngineExecutionException struct {
+ errors.SeataError
+ stateName string
+ stateMachineName string
+ stateMachineInstanceId string
+ stateInstanceId string
+}
+
+func NewEngineExecutionException(code errors.TransactionErrorCode, msg string,
parent error) *EngineExecutionException {
+ seataError := errors.New(code, msg, parent)
+ return &EngineExecutionException{
+ SeataError: *seataError,
+ }
+}
+
+func (e *EngineExecutionException) StateName() string {
+ return e.stateName
+}
+
+func (e *EngineExecutionException) SetStateName(stateName string) {
+ e.stateName = stateName
+}
+
+func (e *EngineExecutionException) StateMachineName() string {
+ return e.stateMachineName
+}
+
+func (e *EngineExecutionException) SetStateMachineName(stateMachineName
string) {
+ e.stateMachineName = stateMachineName
+}
+
+func (e *EngineExecutionException) StateMachineInstanceId() string {
+ return e.stateMachineInstanceId
+}
+
+func (e *EngineExecutionException)
SetStateMachineInstanceId(stateMachineInstanceId string) {
+ e.stateMachineInstanceId = stateMachineInstanceId
+}
+
+func (e *EngineExecutionException) StateInstanceId() string {
+ return e.stateInstanceId
+}
+
+func (e *EngineExecutionException) SetStateInstanceId(stateInstanceId string) {
+ e.stateInstanceId = stateInstanceId
+}
+
+type ForwardInvalidException struct {
+ EngineExecutionException
+}
diff --git a/pkg/saga/statemachine/engine/expr/expression.go
b/pkg/saga/statemachine/engine/expr/expression.go
index 9706cda5..767c49da 100644
--- a/pkg/saga/statemachine/engine/expr/expression.go
+++ b/pkg/saga/statemachine/engine/expr/expression.go
@@ -1,10 +1,93 @@
package expr
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
+ "strings"
+)
+
+const DefaultExpressionType string = "Default"
+
type ExpressionResolver interface {
+ Expression(expressionStr string) Expression
+ ExpressionFactoryManager() ExpressionFactoryManager
+ SetExpressionFactoryManager(expressionFactoryManager
ExpressionFactoryManager)
}
type Expression interface {
+ Value(elContext any) any
+ SetValue(value any, elContext any)
+ ExpressionString() string
+}
+
+type ExpressionFactory interface {
+ CreateExpression(expression string) Expression
}
type ExpressionFactoryManager struct {
+ expressionFactoryMap map[string]ExpressionFactory
+}
+
+func NewExpressionFactoryManager() *ExpressionFactoryManager {
+ return &ExpressionFactoryManager{
+ expressionFactoryMap: make(map[string]ExpressionFactory),
+ }
+}
+
+func (e *ExpressionFactoryManager) GetExpressionFactory(expressionType string)
ExpressionFactory {
+ if strings.TrimSpace(expressionType) == "" {
+ expressionType = DefaultExpressionType
+ }
+ return e.expressionFactoryMap[expressionType]
+}
+
+func (e *ExpressionFactoryManager)
SetExpressionFactoryMap(expressionFactoryMap map[string]ExpressionFactory) {
+ for k, v := range expressionFactoryMap {
+ e.expressionFactoryMap[k] = v
+ }
+}
+
+func (e *ExpressionFactoryManager) PutExpressionFactory(expressionType string,
factory ExpressionFactory) {
+ e.expressionFactoryMap[expressionType] = factory
+}
+
+type SequenceExpression struct {
+ seqGenerator sequence.SeqGenerator
+ entity string
+ rule string
+}
+
+func (s *SequenceExpression) SeqGenerator() sequence.SeqGenerator {
+ return s.seqGenerator
+}
+
+func (s *SequenceExpression) SetSeqGenerator(seqGenerator
sequence.SeqGenerator) {
+ s.seqGenerator = seqGenerator
+}
+
+func (s *SequenceExpression) Entity() string {
+ return s.entity
+}
+
+func (s *SequenceExpression) SetEntity(entity string) {
+ s.entity = entity
+}
+
+func (s *SequenceExpression) Rule() string {
+ return s.rule
+}
+
+func (s *SequenceExpression) SetRule(rule string) {
+ s.rule = rule
+}
+
+func (s SequenceExpression) Value(elContext any) any {
+ return s.seqGenerator.GenerateId(s.entity, s.rule)
+}
+
+func (s SequenceExpression) SetValue(value any, elContext any) {
+
+}
+
+func (s SequenceExpression) ExpressionString() string {
+ return s.entity + "|" + s.rule
}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/instruction.go
b/pkg/saga/statemachine/engine/process_ctrl/instruction.go
deleted file mode 100644
index 2f7c0a54..00000000
--- a/pkg/saga/statemachine/engine/process_ctrl/instruction.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package process_ctrl
-
-import (
- "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
-)
-
-type Instruction interface {
-}
-
-type StateInstruction struct {
- StateName string
- StateMachineName string
- TenantId string
- End bool
-}
-
-func (s StateInstruction) GetState(context ProcessContext) (statelang.State,
error) {
- //TODO implement me
- panic("implement me")
-}
-
-func NewStateInstruction(stateMachineName string, tenantId string)
*StateInstruction {
- return &StateInstruction{StateMachineName: stateMachineName, TenantId:
tenantId}
-}
diff --git a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
deleted file mode 100644
index aae9fe89..00000000
--- a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
+++ /dev/null
@@ -1,125 +0,0 @@
-package engine
-
-import (
- "context"
- "time"
-
- "github.com/pkg/errors"
- "github.com/seata/seata-go/pkg/saga/statemachine/constant"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
- "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
-)
-
-type ProcessCtrlStateMachineEngine struct {
- StateMachineConfig StateMachineConfig
-}
-
-func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context,
stateMachineName string, tenantId string, startParams map[string]interface{})
(statelang.StateMachineInstance, error) {
- return p.startInternal(ctx, stateMachineName, tenantId, "",
startParams, false, nil)
-}
-
-func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context,
stateMachineName string, tenantId string, businessKey string, startParams
map[string]interface{}, async bool, callback CallBack)
(statelang.StateMachineInstance, error) {
- if tenantId == "" {
- tenantId = p.StateMachineConfig.DefaultTenantId()
- }
-
- stateMachineInstance, err := p.createMachineInstance(stateMachineName,
tenantId, businessKey, startParams)
- if err != nil {
- return nil, err
- }
-
- // Build the process_ctrl context.
- processContextBuilder := NewProcessContextBuilder().
- WithProcessType(process_ctrl.StateLang).
- WithOperationName(constant.OperationNameStart).
- WithAsyncCallback(callback).
-
WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, tenantId)).
- WithStateMachineInstance(stateMachineInstance).
- WithStateMachineConfig(p.StateMachineConfig).
- WithStateMachineEngine(p).
- WithIsAsyncExecution(async)
-
- contextMap := p.copyMap(startParams)
-
- stateMachineInstance.SetContext(contextMap)
-
- processContext :=
processContextBuilder.WithStateMachineContextVariables(contextMap).Build()
-
- if stateMachineInstance.StateMachine().IsPersist() &&
p.StateMachineConfig.StateLogStore() != nil {
- err :=
p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx,
stateMachineInstance, processContext)
- if err != nil {
- return nil, err
- }
- }
-
- if stateMachineInstance.ID() == "" {
-
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst,
""))
- }
-
- var eventPublisher events.EventPublisher
- if async {
- eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
- } else {
- eventPublisher = p.StateMachineConfig.EventPublisher()
- }
-
- _, err = eventPublisher.PushEvent(ctx, processContext)
- if err != nil {
- return nil, err
- }
-
- return stateMachineInstance, nil
-}
-
-// copyMap not deep copy, so best practice: Don’t pass by reference
-func (p ProcessCtrlStateMachineEngine) copyMap(startParams
map[string]interface{}) map[string]interface{} {
- copyMap := make(map[string]interface{}, len(startParams))
- for k, v := range startParams {
- copyMap[k] = v
- }
- return copyMap
-}
-
-func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName
string, tenantId string, businessKey string, startParams
map[string]interface{}) (statelang.StateMachineInstance, error) {
- stateMachine, err :=
p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName,
tenantId)
- if err != nil {
- return nil, err
- }
-
- if stateMachine == nil {
- return nil, errors.New("StateMachine [" + stateMachineName + "]
is not exists")
- }
-
- stateMachineInstance := statelang.NewStateMachineInstanceImpl()
- stateMachineInstance.SetStateMachine(stateMachine)
- stateMachineInstance.SetTenantID(tenantId)
- stateMachineInstance.SetBusinessKey(businessKey)
- stateMachineInstance.SetStartParams(startParams)
- if startParams != nil {
- if businessKey != "" {
- startParams[constant.VarNameBusinesskey] = businessKey
- }
-
- if startParams[constant.VarNameParentId] != nil {
- parentId, ok :=
startParams[constant.VarNameParentId].(string)
- if !ok {
-
- }
- stateMachineInstance.SetParentID(parentId)
- delete(startParams, constant.VarNameParentId)
- }
- }
-
- stateMachineInstance.SetStatus(statelang.RU)
- stateMachineInstance.SetRunning(true)
-
- now := time.Now()
- stateMachineInstance.SetStartedTime(now)
- stateMachineInstance.SetUpdatedTime(now)
- return stateMachineInstance, nil
-}
-
-func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig)
*ProcessCtrlStateMachineEngine {
- return &ProcessCtrlStateMachineEngine{StateMachineConfig:
stateMachineConfig}
-}
diff --git a/pkg/saga/statemachine/engine/statemachine_engine.go
b/pkg/saga/statemachine/engine/statemachine_engine.go
deleted file mode 100644
index 6b3954c1..00000000
--- a/pkg/saga/statemachine/engine/statemachine_engine.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package engine
-
-import (
- "context"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
- "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
-)
-
-type StateMachineEngine interface {
- Start(ctx context.Context, stateMachineName string, tenantId string,
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
-}
-
-type CallBack interface {
- OnFinished(ctx context.Context, context process_ctrl.ProcessContext,
stateMachineInstance statelang.StateMachineInstance)
- OnError(ctx context.Context, context process_ctrl.ProcessContext,
stateMachineInstance statelang.StateMachineInstance, err error)
-}
diff --git a/pkg/saga/statemachine/engine/statemachine_engine_test.go
b/pkg/saga/statemachine/engine/statemachine_engine_test.go
deleted file mode 100644
index c9ac2518..00000000
--- a/pkg/saga/statemachine/engine/statemachine_engine_test.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package engine
-
-import "testing"
-
-func TestEngine(t *testing.T) {
-
-}
diff --git a/pkg/saga/statemachine/engine/status_decision/status_decision.go
b/pkg/saga/statemachine/engine/status_decision/status_decision.go
deleted file mode 100644
index 6def093e..00000000
--- a/pkg/saga/statemachine/engine/status_decision/status_decision.go
+++ /dev/null
@@ -1,4 +0,0 @@
-package status_decision
-
-type StatusDecisionStrategy interface {
-}
diff --git
a/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go
b/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go
new file mode 100644
index 00000000..28d60d0a
--- /dev/null
+++ b/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go
@@ -0,0 +1,174 @@
+package handlers
+
+import (
+ "context"
+ "errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+ seataErrors "github.com/seata/seata-go/pkg/util/errors"
+ "github.com/seata/seata-go/pkg/util/log"
+)
+
+type ServiceTaskStateHandler struct {
+ interceptors []core.StateHandlerInterceptor
+}
+
+func NewServiceTaskStateHandler() *ServiceTaskStateHandler {
+ return &ServiceTaskStateHandler{}
+}
+
+func (s *ServiceTaskStateHandler) State() string {
+ return constant.StateTypeServiceTask
+}
+
+func (s *ServiceTaskStateHandler) Process(ctx context.Context, processContext
core.ProcessContext) error {
+ stateInstruction, ok :=
processContext.GetInstruction().(core.StateInstruction)
+ if !ok {
+ return errors.New("invalid state instruction from
processContext")
+ }
+ stateInterface, err := stateInstruction.GetState(processContext)
+ if err != nil {
+ return err
+ }
+ serviceTaskStateImpl, ok := stateInterface.(*state.ServiceTaskStateImpl)
+
+ serviceName := serviceTaskStateImpl.ServiceName()
+ methodName := serviceTaskStateImpl.ServiceMethod()
+ stateInstance, ok :=
processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)
+ if !ok {
+ return errors.New("invalid state instance type from
processContext")
+ }
+
+ // invoke service task and record
+ var result any
+ var resultErr error
+ handleResultErr := func(err error) {
+ log.Error("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s],
Method[%s] Execute failed.",
+ serviceTaskStateImpl.Name(), serviceName, methodName,
err)
+
+ hierarchicalProcessContext, ok :=
processContext.(core.HierarchicalProcessContext)
+ if !ok {
+ return
+ }
+
hierarchicalProcessContext.SetVariable(constant.VarNameCurrentException, err)
+ core.HandleException(processContext,
serviceTaskStateImpl.AbstractTaskState, err)
+ }
+
+ input, ok :=
processContext.GetVariable(constant.VarNameInputParams).([]any)
+ if !ok {
+ handleResultErr(errors.New("invalid input params type from
processContext"))
+ return nil
+ }
+
+ stateInstance.SetStatus(statelang.RU)
+ log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[%s],
ServiceName[%s], Method[%s], Input:%s",
+ serviceTaskStateImpl.Name(), serviceName, methodName, input)
+
+ if _, ok := stateInterface.(state.CompensateSubStateMachineState); ok {
+ // If it is the compensation of the subState machine,
+ // directly call the state machine's compensate method
+ stateMachineEngine, ok :=
processContext.GetVariable(constant.VarNameStateMachineEngine).(core.StateMachineEngine)
+ if !ok {
+ handleResultErr(errors.New("invalid stateMachineEngine
type from processContext"))
+ return nil
+ }
+
+ result, resultErr = s.compensateSubStateMachine(ctx,
processContext, serviceTaskStateImpl, input,
+ stateInstance, stateMachineEngine)
+ if resultErr != nil {
+ handleResultErr(resultErr)
+ return nil
+ }
+ } else {
+ stateMachineConfig, ok :=
processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig)
+ if !ok {
+ handleResultErr(errors.New("invalid stateMachineConfig
type from processContext"))
+ return nil
+ }
+
+ serviceInvoker :=
stateMachineConfig.ServiceInvokerManager().ServiceInvoker(serviceTaskStateImpl.ServiceType())
+ if serviceInvoker == nil {
+ resultErr =
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+ "No such
ServiceInvoker["+serviceTaskStateImpl.ServiceType()+"]", nil)
+ handleResultErr(resultErr)
+ return nil
+ }
+
+ result, resultErr = serviceInvoker.Invoke(ctx, input,
serviceTaskStateImpl)
+ if resultErr != nil {
+ handleResultErr(resultErr)
+ return nil
+ }
+ }
+
+ log.Debugf("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s],
Method[%s] Execute finish. result: %s",
+ serviceTaskStateImpl.Name(), serviceName, methodName, result)
+
+ if result != nil {
+ stateInstance.SetOutputParams(result)
+ hierarchicalProcessContext, ok :=
processContext.(core.HierarchicalProcessContext)
+ if !ok {
+ handleResultErr(errors.New("invalid hierarchical
process context type from processContext"))
+ return nil
+ }
+
+
hierarchicalProcessContext.SetVariable(constant.VarNameOutputParams, result)
+ }
+
+ return nil
+}
+
+func (s *ServiceTaskStateHandler) StateHandlerInterceptorList()
[]core.StateHandlerInterceptor {
+ return s.interceptors
+}
+
+func (s *ServiceTaskStateHandler)
RegistryStateHandlerInterceptor(stateHandlerInterceptor
core.StateHandlerInterceptor) {
+ s.interceptors = append(s.interceptors, stateHandlerInterceptor)
+}
+
+func (s *ServiceTaskStateHandler) compensateSubStateMachine(ctx
context.Context, processContext core.ProcessContext,
+ serviceTaskState state.ServiceTaskState, input any, instance
statelang.StateInstance,
+ machineEngine core.StateMachineEngine) (any, error) {
+ subStateMachineParentId, ok :=
processContext.GetVariable(serviceTaskState.Name() +
constant.VarNameSubMachineParentId).(string)
+ if !ok {
+ return nil, errors.New("invalid subStateMachineParentId type
from processContext")
+ }
+
+ if subStateMachineParentId == "" {
+ return nil,
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+ "sub statemachine parentId is required", nil)
+ }
+
+ stateMachineConfig :=
processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig)
+ subInst, err :=
stateMachineConfig.StateLogStore().GetStateMachineInstanceByParentId(subStateMachineParentId)
+ if err != nil {
+ return nil, err
+ }
+
+ if subInst == nil || len(subInst) == 0 {
+ return nil,
exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
+ "cannot find sub statemachine instance by
parentId:"+subStateMachineParentId, nil)
+ }
+
+ subStateMachineInstId := subInst[0].ID()
+ log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to compensate sub statemachine
[id:%s]", subStateMachineInstId)
+
+ startParams := make(map[string]any)
+
+ if inputList, ok := input.([]any); ok {
+ if len(inputList) > 0 {
+ startParams = inputList[0].(map[string]any)
+ }
+ } else if inputMap, ok := input.(map[string]any); ok {
+ startParams = inputMap
+ }
+
+ compensateInst, err := machineEngine.Compensate(ctx,
subStateMachineInstId, startParams)
+ instance.SetStatus(compensateInst.CompensationStatus())
+ log.Debugf("<<<<<<<<<<<<<<<<<<<<<< Compensate sub statemachine [id:%s]
finished with status[%s], "+"compensateState[%s]",
+ subStateMachineInstId, compensateInst.Status(),
compensateInst.CompensationStatus())
+ return compensateInst.EndParams(), nil
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_type.go
b/pkg/saga/statemachine/process_ctrl/process/process_type.go
similarity index 82%
rename from pkg/saga/statemachine/engine/process_ctrl/process_type.go
rename to pkg/saga/statemachine/process_ctrl/process/process_type.go
index 14027a28..b1a8e030 100644
--- a/pkg/saga/statemachine/engine/process_ctrl/process_type.go
+++ b/pkg/saga/statemachine/process_ctrl/process/process_type.go
@@ -1,4 +1,4 @@
-package process_ctrl
+package process
type ProcessType string
diff --git a/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
b/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
index dc466da5..4de6e5de 100644
--- a/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
+++ b/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
@@ -64,7 +64,7 @@ func NewCompensateSubStateMachineStateParser()
*CompensateSubStateMachineStatePa
}
func (c CompensateSubStateMachineStateParser) StateType() string {
- return constant.CompensateSubMachine
+ return constant.StateTypeCompensateSubMachine
}
func (c CompensateSubStateMachineStateParser) Parse(stateName string, stateMap
map[string]interface{}) (statelang.State, error) {
diff --git a/pkg/saga/statemachine/statelang/state/loop_start_state.go
b/pkg/saga/statemachine/statelang/state/loop_start_state.go
new file mode 100644
index 00000000..e059431d
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state/loop_start_state.go
@@ -0,0 +1,22 @@
+package state
+
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/constant"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type LoopStartState interface {
+ statelang.State
+}
+
+type LoopStartStateImpl struct {
+ *statelang.BaseState
+}
+
+func NewLoopStartStateImpl() *LoopStartStateImpl {
+ baseState := statelang.NewBaseState()
+ baseState.SetType(constant.StateTypeLoopStart)
+ return &LoopStartStateImpl{
+ BaseState: baseState,
+ }
+}
diff --git a/pkg/saga/statemachine/statelang/state/sub_state_machine.go
b/pkg/saga/statemachine/statelang/state/sub_state_machine.go
index bf0d96d5..78683a0a 100644
--- a/pkg/saga/statemachine/statelang/state/sub_state_machine.go
+++ b/pkg/saga/statemachine/statelang/state/sub_state_machine.go
@@ -56,7 +56,7 @@ func NewCompensateSubStateMachineStateImpl()
*CompensateSubStateMachineStateImpl
ServiceTaskStateImpl: NewServiceTaskStateImpl(),
hashcode: uuid.String(),
}
- c.SetType(constant.CompensateSubMachine)
+ c.SetType(constant.StateTypeCompensateSubMachine)
return c
}
diff --git a/pkg/saga/statemachine/statelang/state/task_state.go
b/pkg/saga/statemachine/statelang/state/task_state.go
index 734680ce..bc9880ac 100644
--- a/pkg/saga/statemachine/statelang/state/task_state.go
+++ b/pkg/saga/statemachine/statelang/state/task_state.go
@@ -38,7 +38,7 @@ type Loop interface {
type ExceptionMatch interface {
Exceptions() []string
-
+ // TODO: go dose not support get reflect.Type by string, not use it now
ExceptionTypes() []reflect.Type
SetExceptionTypes(ExceptionTypes []reflect.Type)
@@ -79,7 +79,9 @@ type AbstractTaskState struct {
loop Loop
catches []ExceptionMatch
input []interface{}
+ inputExpressions []interface{}
output map[string]interface{}
+ outputExpressions map[string]interface{}
compensatePersistModeUpdate bool
retryPersistModeUpdate bool
forCompensation bool
@@ -96,6 +98,22 @@ func NewAbstractTaskState() *AbstractTaskState {
}
}
+func (a *AbstractTaskState) InputExpressions() []interface{} {
+ return a.inputExpressions
+}
+
+func (a *AbstractTaskState) SetInputExpressions(inputExpressions
[]interface{}) {
+ a.inputExpressions = inputExpressions
+}
+
+func (a *AbstractTaskState) OutputExpressions() map[string]interface{} {
+ return a.outputExpressions
+}
+
+func (a *AbstractTaskState) SetOutputExpressions(outputExpressions
map[string]interface{}) {
+ a.outputExpressions = outputExpressions
+}
+
func (a *AbstractTaskState) Input() []interface{} {
return a.input
}
diff --git a/pkg/saga/statemachine/statelang/statemachine_instance.go
b/pkg/saga/statemachine/statelang/statemachine_instance.go
index 399d222b..9da71ee2 100644
--- a/pkg/saga/statemachine/statelang/statemachine_instance.go
+++ b/pkg/saga/statemachine/statelang/statemachine_instance.go
@@ -71,9 +71,9 @@ type StateMachineInstance interface {
SetBusinessKey(businessKey string)
- Error() error
+ Exception() error
- SetError(err error)
+ SetException(err error)
StartParams() map[string]interface{}
@@ -83,6 +83,8 @@ type StateMachineInstance interface {
SetEndParams(endParams map[string]interface{})
+ Context() map[string]interface{}
+
PutContext(key string, value interface{})
SetContext(context map[string]interface{})
@@ -115,7 +117,7 @@ type StateMachineInstanceImpl struct {
startedTime time.Time
endTime time.Time
updatedTime time.Time
- err error
+ exception error
serializedError interface{}
endParams map[string]interface{}
serializedEndParams interface{}
@@ -247,12 +249,12 @@ func (s *StateMachineInstanceImpl)
SetBusinessKey(businessKey string) {
s.businessKey = businessKey
}
-func (s *StateMachineInstanceImpl) Error() error {
- return s.err
+func (s *StateMachineInstanceImpl) Exception() error {
+ return s.exception
}
-func (s *StateMachineInstanceImpl) SetError(err error) {
- s.err = err
+func (s *StateMachineInstanceImpl) SetException(err error) {
+ s.exception = err
}
func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} {
@@ -271,6 +273,10 @@ func (s *StateMachineInstanceImpl) SetEndParams(endParams
map[string]interface{}
s.endParams = endParams
}
+func (s *StateMachineInstanceImpl) Context() map[string]interface{} {
+ return s.context
+}
+
func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) {
s.contextMutex.Lock()
defer s.contextMutex.Unlock()
diff --git a/pkg/saga/statemachine/engine/store/db/db.go
b/pkg/saga/statemachine/store/db/db.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/db.go
rename to pkg/saga/statemachine/store/db/db.go
diff --git a/pkg/saga/statemachine/engine/store/db/db_test.go
b/pkg/saga/statemachine/store/db/db_test.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/db_test.go
rename to pkg/saga/statemachine/store/db/db_test.go
diff --git a/pkg/saga/statemachine/engine/store/db/statelang.go
b/pkg/saga/statemachine/store/db/statelang.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/statelang.go
rename to pkg/saga/statemachine/store/db/statelang.go
diff --git a/pkg/saga/statemachine/engine/store/db/statelang_test.go
b/pkg/saga/statemachine/store/db/statelang_test.go
similarity index 100%
rename from pkg/saga/statemachine/engine/store/db/statelang_test.go
rename to pkg/saga/statemachine/store/db/statelang_test.go
diff --git a/pkg/saga/statemachine/engine/store/db/statelog.go
b/pkg/saga/statemachine/store/db/statelog.go
similarity index 98%
rename from pkg/saga/statemachine/engine/store/db/statelog.go
rename to pkg/saga/statemachine/store/db/statelog.go
index 26b89ed6..23c3b551 100644
--- a/pkg/saga/statemachine/engine/store/db/statelog.go
+++ b/pkg/saga/statemachine/store/db/statelog.go
@@ -6,7 +6,7 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/serializer"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
@@ -89,7 +89,7 @@ func NewStateLogStore(db *sql.DB, tablePrefix string)
*StateLogStore {
}
func (s *StateLogStore) RecordStateMachineStarted(ctx context.Context,
machineInstance statelang.StateMachineInstance,
- context process_ctrl.ProcessContext) error {
+ context core.ProcessContext) error {
if machineInstance == nil {
return nil
}
@@ -133,15 +133,15 @@ func (s *StateLogStore) RecordStateMachineStarted(ctx
context.Context, machineIn
}
func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context,
machineInstance statelang.StateMachineInstance,
- context process_ctrl.ProcessContext) error {
+ context core.ProcessContext) error {
if machineInstance == nil {
return nil
}
endParams := machineInstance.EndParams()
- if statelang.SU == machineInstance.Status() && machineInstance.Error()
!= nil {
- machineInstance.SetError(nil)
+ if statelang.SU == machineInstance.Status() &&
machineInstance.Exception() != nil {
+ machineInstance.SetException(nil)
}
serializedEndParams, err := s.paramsSerializer.Serialize(endParams)
@@ -149,7 +149,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx
context.Context, machineI
return err
}
machineInstance.SetSerializedEndParams(serializedEndParams)
- serializedError, err :=
s.errorSerializer.Serialize(machineInstance.Error())
+ serializedError, err :=
s.errorSerializer.Serialize(machineInstance.Exception())
if err != nil {
return err
}
@@ -171,7 +171,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx
context.Context, machineI
}
func (s *StateLogStore) RecordStateMachineRestarted(ctx context.Context,
machineInstance statelang.StateMachineInstance,
- context process_ctrl.ProcessContext) error {
+ context core.ProcessContext) error {
if machineInstance == nil {
return nil
}
@@ -190,7 +190,7 @@ func (s *StateLogStore) RecordStateMachineRestarted(ctx
context.Context, machine
}
func (s *StateLogStore) RecordStateStarted(ctx context.Context, stateInstance
statelang.StateInstance,
- context process_ctrl.ProcessContext) error {
+ context core.ProcessContext) error {
if stateInstance == nil {
return nil
}
@@ -235,7 +235,7 @@ func (s *StateLogStore) RecordStateStarted(ctx
context.Context, stateInstance st
return nil
}
-func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context
process_ctrl.ProcessContext) bool {
+func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context
core.ProcessContext) bool {
//TODO implement me, add forward logic
return false
}
@@ -293,7 +293,7 @@ func (s *StateLogStore) getIdIndex(stateInstanceId string,
separator string) int
}
func (s *StateLogStore) RecordStateFinished(ctx context.Context, stateInstance
statelang.StateInstance,
- context process_ctrl.ProcessContext) error {
+ context core.ProcessContext) error {
if stateInstance == nil {
return nil
}
@@ -372,7 +372,7 @@ func (s *StateLogStore)
deserializeStateMachineParamsAndException(stateMachineIn
if err != nil {
return err
}
- stateMachineInstance.SetError(deserializedError)
+ stateMachineInstance.SetException(deserializedError)
}
serializedStartParams := stateMachineInstance.SerializedStartParams()
diff --git a/pkg/saga/statemachine/engine/store/db/statelog_test.go
b/pkg/saga/statemachine/store/db/statelog_test.go
similarity index 94%
rename from pkg/saga/statemachine/engine/store/db/statelog_test.go
rename to pkg/saga/statemachine/store/db/statelog_test.go
index 9044e6df..43767476 100644
--- a/pkg/saga/statemachine/engine/store/db/statelog_test.go
+++ b/pkg/saga/statemachine/store/db/statelog_test.go
@@ -5,19 +5,19 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine"
- "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
+ "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
-func mockProcessContext(stateMachineName string, stateMachineInstance
statelang.StateMachineInstance) process_ctrl.ProcessContext {
- ctx := engine.NewProcessContextBuilder().
- WithProcessType(process_ctrl.StateLang).
+func mockProcessContext(stateMachineName string, stateMachineInstance
statelang.StateMachineInstance) core.ProcessContext {
+ ctx := core.NewProcessContextBuilder().
+ WithProcessType(process.StateLang).
WithOperationName(constant.OperationNameStart).
-
WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, "000001")).
+ WithInstruction(core.NewStateInstruction(stateMachineName,
"000001")).
WithStateMachineInstance(stateMachineInstance).
Build()
return ctx
@@ -55,7 +55,7 @@ func TestStateLogStore_RecordStateMachineStarted(t
*testing.T) {
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
assert.Equal(t, fmt.Sprint(expected.StartParams()),
fmt.Sprint(actual.StartParams()))
- assert.Nil(t, actual.Error())
+ assert.Nil(t, actual.Exception())
assert.Nil(t, actual.SerializedError())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.StartedTime().UnixNano(),
actual.StartedTime().UnixNano())
@@ -73,7 +73,7 @@ func TestStateLogStore_RecordStateMachineFinished(t
*testing.T) {
err := stateLogStore.RecordStateMachineStarted(context.Background(),
expected, ctx)
assert.Nil(t, err)
expected.SetEndParams(map[string]any{"end": 100})
- expected.SetError(errors.New("this is a test error"))
+ expected.SetException(errors.New("this is a test error"))
expected.SetStatus(statelang.FA)
expected.SetEndTime(time.Now())
expected.SetRunning(false)
@@ -86,7 +86,7 @@ func TestStateLogStore_RecordStateMachineFinished(t
*testing.T) {
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
assert.Equal(t, fmt.Sprint(expected.StartParams()),
fmt.Sprint(actual.StartParams()))
- assert.Equal(t, "this is a test error", actual.Error().Error())
+ assert.Equal(t, "this is a test error", actual.Exception().Error())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.IsRunning(), actual.IsRunning())
assert.Equal(t, expected.StartedTime().UnixNano(),
actual.StartedTime().UnixNano())
@@ -240,7 +240,7 @@ func
TestStateLogStore_GetStateMachineInstanceByBusinessKey(t *testing.T) {
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
assert.Equal(t, fmt.Sprint(expected.StartParams()),
fmt.Sprint(actual.StartParams()))
- assert.Nil(t, actual.Error())
+ assert.Nil(t, actual.Exception())
assert.Nil(t, actual.SerializedError())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.StartedTime().UnixNano(),
actual.StartedTime().UnixNano())
@@ -269,9 +269,9 @@ func TestStateLogStore_GetStateMachineInstanceByParentId(t
*testing.T) {
actual := actualList[0]
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
- // no startParams, endParams and Error
+ // no startParams, endParams and Exception
assert.NotEqual(t, fmt.Sprint(expected.StartParams()),
fmt.Sprint(actual.StartParams()))
- assert.Nil(t, actual.Error())
+ assert.Nil(t, actual.Exception())
assert.Nil(t, actual.SerializedError())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.StartedTime().UnixNano(),
actual.StartedTime().UnixNano())
diff --git a/pkg/saga/statemachine/store/repository/state_machine_repository.go
b/pkg/saga/statemachine/store/repository/state_machine_repository.go
new file mode 100644
index 00000000..8a62cc49
--- /dev/null
+++ b/pkg/saga/statemachine/store/repository/state_machine_repository.go
@@ -0,0 +1,34 @@
+package repository
+
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "io"
+)
+
+type StateMachineRepositoryImpl struct {
+}
+
+func (s StateMachineRepositoryImpl) GetStateMachineById(stateMachineId string)
(statelang.StateMachine, error) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl)
GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl)
GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl) RegistryStateMachine(machine
statelang.StateMachine) error {
+ //TODO implement me
+ panic("implement me")
+}
+
+func (s StateMachineRepositoryImpl) RegistryStateMachineByReader(reader
io.Reader) error {
+ //TODO implement me
+ panic("implement me")
+}
diff --git a/pkg/tm/transaction_executor_test.go
b/pkg/tm/transaction_executor_test.go
index f0e513ad..c9bd1084 100644
--- a/pkg/tm/transaction_executor_test.go
+++ b/pkg/tm/transaction_executor_test.go
@@ -361,7 +361,7 @@ func TestBeginNewGtx(t *testing.T) {
assert.Equal(t, message.GlobalStatusBegin, *GetTxStatus(ctx))
// case return error
- err := errors.New("Mock Error")
+ err := errors.New("Mock Exception")
gomonkey.ApplyMethod(reflect.TypeOf(GetGlobalTransactionManager()),
"Begin",
func(_ *GlobalTransactionManager, ctx context.Context, timeout
time.Duration) error {
return err
diff --git a/pkg/util/collection/collection.go
b/pkg/util/collection/collection.go
index 5149e1a2..4dc04f7c 100644
--- a/pkg/util/collection/collection.go
+++ b/pkg/util/collection/collection.go
@@ -17,7 +17,10 @@
package collection
-import "strings"
+import (
+ "container/list"
+ "strings"
+)
const (
KvSplit = "="
@@ -81,3 +84,42 @@ func DecodeMap(data []byte) map[string]string {
return ctxMap
}
+
+type Stack struct {
+ list *list.List
+}
+
+func NewStack() *Stack {
+ list := list.New()
+ return &Stack{list}
+}
+
+func (stack *Stack) Push(value interface{}) {
+ stack.list.PushBack(value)
+}
+
+func (stack *Stack) Pop() interface{} {
+ e := stack.list.Back()
+ if e != nil {
+ stack.list.Remove(e)
+ return e.Value
+ }
+ return nil
+}
+
+func (stack *Stack) Peak() interface{} {
+ e := stack.list.Back()
+ if e != nil {
+ return e.Value
+ }
+
+ return nil
+}
+
+func (stack *Stack) Len() int {
+ return stack.list.Len()
+}
+
+func (stack *Stack) Empty() bool {
+ return stack.list.Len() == 0
+}
diff --git a/pkg/util/collection/collection_test.go
b/pkg/util/collection/collection_test.go
index 74df03cd..10a9e20f 100644
--- a/pkg/util/collection/collection_test.go
+++ b/pkg/util/collection/collection_test.go
@@ -57,3 +57,26 @@ func TestEncodeDecodeMap(t *testing.T) {
})
}
}
+
+func TestStack(t *testing.T) {
+ stack := NewStack()
+ stack.Push(1)
+ stack.Push(2)
+ stack.Push(3)
+ stack.Push(4)
+
+ len := stack.Len()
+ if len != 4 {
+ t.Errorf("stack.Len() failed. Got %d, expected 4.", len)
+ }
+
+ value := stack.Peak().(int)
+ if value != 4 {
+ t.Errorf("stack.Peak() failed. Got %d, expected 4.", value)
+ }
+
+ value = stack.Pop().(int)
+ if value != 4 {
+ t.Errorf("stack.Pop() failed. Got %d, expected 4.", value)
+ }
+}
diff --git a/pkg/util/errors/code.go b/pkg/util/errors/code.go
index dfe50205..9b15e72a 100644
--- a/pkg/util/errors/code.go
+++ b/pkg/util/errors/code.go
@@ -100,4 +100,15 @@ const (
// FencePhaseError have fence phase but is not illegal value
FencePhaseError
+
+ // ObjectNotExists object not exists
+ ObjectNotExists
+ // StateMachineInstanceNotExists State machine instance not exists
+ StateMachineInstanceNotExists
+ // ContextVariableReplayFailed Context variable replay failed
+ ContextVariableReplayFailed
+ // InvalidParameter Context variable replay failed
+ InvalidParameter
+ // OperationDenied Operation denied
+ OperationDenied
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]