This is an automated email from the ASF dual-hosted git repository.
yixia pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push:
new 58fc3e13 feature: init saga framework (#635)
58fc3e13 is described below
commit 58fc3e13dffa176082f39a3a989b20b2752c47e0
Author: wt_better <[email protected]>
AuthorDate: Sat Dec 23 23:01:17 2023 +0800
feature: init saga framework (#635)
init saga framework
---
pkg/saga/readme.md | 31 ++
pkg/saga/statemachine/engine/contant.go | 20 ++
pkg/saga/statemachine/engine/events/event.go | 4 +
pkg/saga/statemachine/engine/events/event_bus.go | 49 +++
.../statemachine/engine/events/event_consumer.go | 29 ++
.../statemachine/engine/events/event_publisher.go | 19 ++
pkg/saga/statemachine/engine/expr/expression.go | 10 +
pkg/saga/statemachine/engine/invoker/invoker.go | 14 +
.../engine/process_ctrl/bussiness_processor.go | 92 ++++++
.../engine/process_ctrl/instruction/instruction.go | 25 ++
.../engine/process_ctrl/process_context.go | 284 ++++++++++++++++++
.../engine/process_ctrl/process_type.go | 7 +
.../engine/process_ctrl/statemachine_processor.go | 139 +++++++++
.../engine/process_ctrl_statemachine_engine.go | 124 ++++++++
pkg/saga/statemachine/engine/sequence/sequence.go | 5 +
pkg/saga/statemachine/engine/sequence/uuid.go | 14 +
.../statemachine/engine/statemachine_config.go | 44 +++
.../statemachine/engine/statemachine_engine.go | 16 +
.../engine/status_decision/status_decision.go | 4 +
.../engine/store/statemachine_store.go | 60 ++++
.../statelang/parser/choice_state_json_parser.go | 72 +++++
.../statelang/parser/statemachine_json_parser.go | 100 +++++++
.../parser/statemachine_json_parser_test.go | 13 +
.../statelang/parser/statemachine_parser.go | 104 +++++++
pkg/saga/statemachine/statelang/state.go | 71 +++++
.../statemachine/statelang/state/choice_state.go | 74 +++++
.../statemachine/statelang/state/task_state.go | 30 ++
pkg/saga/statemachine/statelang/state_instance.go | 330 +++++++++++++++++++++
pkg/saga/statemachine/statelang/statemachine.go | 261 ++++++++++++++++
.../statelang/statemachine_instance.go | 316 ++++++++++++++++++++
pkg/util/reflectx/unmarkshaler.go | 147 +++++++++
31 files changed, 2508 insertions(+)
diff --git a/pkg/saga/readme.md b/pkg/saga/readme.md
new file mode 100644
index 00000000..9aa593da
--- /dev/null
+++ b/pkg/saga/readme.md
@@ -0,0 +1,31 @@
+
+# seata saga
+
+未来计划有三种使用方式
+
+- 基于状态机引擎的 json
+ link: statemachine_engine#Start
+- stream builder
+ stateMachine.serviceTask().build().Start
+- 二阶段方式saga,类似tcc使用
+
+上面1、2是以来[statemachine](statemachine),状态机引擎实现的,3相对比较独立。
+
+
+状态机的实现在:saga-statemachine包中
+其中[statelang](statemachine%2Fstatelang)是状态机语言的解析,目前实现的是json解析方式,状态机语言可以参考:
+https://seata.io/docs/user/mode/saga
+
+状态机json执行的入口类是:[statemachine_engine.go](statemachine%2Fengine%2Fstatemachine_engine.go)
+
+下面简单说下engine中各个包的作用:
+events:saga的是基于事件处理的,其中是event、eventBus的实现
+expr:表达式声明、解析、执行
+invoker:声明了serviceInvoker、scriptInvoker等接口、task调用管理、执行都在这个包中,例如httpInvoker
+process_ctrl:状态机处理流程:上下文、执行、事件流转
+sequence:分布式id
+store:状态机存储接口、实现
+status_decision:状态机状态决策
+
+
+
diff --git a/pkg/saga/statemachine/engine/contant.go
b/pkg/saga/statemachine/engine/contant.go
new file mode 100644
index 00000000..9527fc29
--- /dev/null
+++ b/pkg/saga/statemachine/engine/contant.go
@@ -0,0 +1,20 @@
+package engine
+
+const (
+ VarNameProcessType string = "_ProcessType_"
+ VarNameOperationName string = "_operation_name_"
+ OperationNameStart string = "start"
+ VarNameAsyncCallback string = "_async_callback_"
+ VarNameStateMachineInst string = "_current_statemachine_instance_"
+ VarNameStateMachine string = "_current_statemachine_"
+ VarNameStateMachineEngine string = "_current_statemachine_engine_"
+ VarNameStateMachineConfig string = "_statemachine_config_"
+ VarNameStateMachineContext string = "context"
+ VarNameIsAsyncExecution string = "_is_async_execution_"
+ VarNameStateInst string = "_current_state_instance_"
+ SeqEntityStateMachineInst string = "STATE_MACHINE_INST"
+ VarNameBusinesskey string = "_business_key_"
+ VarNameParentId string = "_parent_id_"
+ StateTypeServiceTask string = "ServiceTask"
+ StateTypeChoice string = "Choice"
+)
diff --git a/pkg/saga/statemachine/engine/events/event.go
b/pkg/saga/statemachine/engine/events/event.go
new file mode 100644
index 00000000..8f142713
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event.go
@@ -0,0 +1,4 @@
+package events
+
+type Event interface {
+}
diff --git a/pkg/saga/statemachine/engine/events/event_bus.go
b/pkg/saga/statemachine/engine/events/event_bus.go
new file mode 100644
index 00000000..77a1e616
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event_bus.go
@@ -0,0 +1,49 @@
+package events
+
+import (
+ "context"
+)
+
+type EventBus interface {
+ Offer(ctx context.Context, event Event) (bool, error)
+
+ RegisterEventConsumer(consumer EventConsumer)
+}
+
+type BaseEventBus struct {
+ eventConsumerList []EventConsumer
+}
+
+func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
+ if b.eventConsumerList == nil {
+ b.eventConsumerList = make([]EventConsumer, 0)
+ }
+ b.eventConsumerList = append(b.eventConsumerList, consumer)
+}
+
+func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer {
+ var acceptedConsumerList = make([]EventConsumer, 0)
+ for i := range b.eventConsumerList {
+ eventConsumer := b.eventConsumerList[i]
+ if eventConsumer.Accept(event) {
+ acceptedConsumerList = append(acceptedConsumerList,
eventConsumer)
+ }
+ }
+ return acceptedConsumerList
+}
+
+type DirectEventBus struct {
+ BaseEventBus
+}
+
+func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
+ eventConsumerList := d.GetEventConsumerList(event)
+ if len(eventConsumerList) == 0 {
+ //TODO logger
+ return false, nil
+ }
+
+ //processContext, _ := event.(process_ctrl.ProcessContext)
+
+ return true, nil
+}
diff --git a/pkg/saga/statemachine/engine/events/event_consumer.go
b/pkg/saga/statemachine/engine/events/event_consumer.go
new file mode 100644
index 00000000..426da15e
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event_consumer.go
@@ -0,0 +1,29 @@
+package events
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+)
+
+type EventConsumer interface {
+ Accept(event Event) bool
+
+ Process(ctx context.Context, event Event) error
+}
+
+type ProcessCtrlEventConsumer struct {
+}
+
+func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
+ if event == nil {
+ return false
+ }
+
+ _, ok := event.(process_ctrl.ProcessContext)
+ return ok
+}
+
+func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event)
error {
+ //TODO implement me
+ panic("implement me")
+}
diff --git a/pkg/saga/statemachine/engine/events/event_publisher.go
b/pkg/saga/statemachine/engine/events/event_publisher.go
new file mode 100644
index 00000000..1fbfaec2
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event_publisher.go
@@ -0,0 +1,19 @@
+package events
+
+import "context"
+
+type EventPublisher interface {
+ PushEvent(ctx context.Context, event Event) (bool, error)
+}
+
+type ProcessCtrlEventPublisher struct {
+ eventBus EventBus
+}
+
+func NewProcessCtrlEventPublisher(eventBus EventBus)
*ProcessCtrlEventPublisher {
+ return &ProcessCtrlEventPublisher{eventBus: eventBus}
+}
+
+func (p ProcessCtrlEventPublisher) PushEvent(ctx context.Context, event Event)
(bool, error) {
+ return p.eventBus.Offer(ctx, event)
+}
diff --git a/pkg/saga/statemachine/engine/expr/expression.go
b/pkg/saga/statemachine/engine/expr/expression.go
new file mode 100644
index 00000000..9706cda5
--- /dev/null
+++ b/pkg/saga/statemachine/engine/expr/expression.go
@@ -0,0 +1,10 @@
+package expr
+
+type ExpressionResolver interface {
+}
+
+type Expression interface {
+}
+
+type ExpressionFactoryManager struct {
+}
diff --git a/pkg/saga/statemachine/engine/invoker/invoker.go
b/pkg/saga/statemachine/engine/invoker/invoker.go
new file mode 100644
index 00000000..8c2ec069
--- /dev/null
+++ b/pkg/saga/statemachine/engine/invoker/invoker.go
@@ -0,0 +1,14 @@
+package invoker
+
+type ScriptInvokerManager interface {
+}
+
+type ScriptInvoker interface {
+}
+
+type ServiceInvokerManager interface {
+}
+
+type ServiceInvoker interface {
+ Invoke()
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
b/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
new file mode 100644
index 00000000..198092a1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
@@ -0,0 +1,92 @@
+package process_ctrl
+
+import (
+ "context"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine"
+ "sync"
+)
+
+type BusinessProcessor interface {
+ Process(ctx context.Context, processContext ProcessContext) error
+
+ Route(ctx context.Context, processContext ProcessContext) error
+}
+
+type DefaultBusinessProcessor struct {
+ processHandlers map[string]ProcessHandler
+ routerHandlers map[string]RouterHandler
+ mu sync.RWMutex
+}
+
+func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType
ProcessType, processHandler ProcessHandler) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ d.processHandlers[string(processType)] = processHandler
+}
+
+func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType
ProcessType, routerHandler RouterHandler) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ d.routerHandlers[string(processType)] = routerHandler
+}
+
+func (d *DefaultBusinessProcessor) Process(ctx context.Context, processContext
ProcessContext) error {
+ processType := d.matchProcessType(processContext)
+
+ processHandler, err := d.getProcessHandler(processType)
+ if err != nil {
+ return err
+ }
+
+ return processHandler.Process(ctx, processContext)
+}
+
+func (d *DefaultBusinessProcessor) Route(ctx context.Context, processContext
ProcessContext) error {
+ processType := d.matchProcessType(processContext)
+
+ routerHandler, err := d.getRouterHandler(processType)
+ if err != nil {
+ return err
+ }
+
+ return routerHandler.Route(ctx, processContext)
+}
+
+func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType)
(ProcessHandler, error) {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ processHandler, ok := d.processHandlers[string(processType)]
+ if !ok {
+ return nil, errors.New("Cannot find process handler by type " +
string(processType))
+ }
+ return processHandler, nil
+}
+
+func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType)
(RouterHandler, error) {
+ d.mu.RLock()
+ defer d.mu.RUnlock()
+ routerHandler, ok := d.routerHandlers[string(processType)]
+ if !ok {
+ return nil, errors.New("Cannot find router handler by type " +
string(processType))
+ }
+ return routerHandler, nil
+}
+
+func (d *DefaultBusinessProcessor) matchProcessType(processContext
ProcessContext) ProcessType {
+ ok := processContext.HasVariable(engine.VarNameProcessType)
+ if ok {
+ return
processContext.GetVariable(engine.VarNameProcessType).(ProcessType)
+ }
+ return StateLang
+}
+
+type ProcessHandler interface {
+ Process(ctx context.Context, processContext ProcessContext) error
+}
+
+type RouterHandler interface {
+ Route(ctx context.Context, processContext ProcessContext) error
+}
diff --git
a/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go
b/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go
new file mode 100644
index 00000000..1537dc67
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go
@@ -0,0 +1,25 @@
+package instruction
+
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type Instruction interface {
+}
+
+type StateInstruction struct {
+ StateName string
+ StateMachineName string
+ TenantId string
+ End bool
+}
+
+func (s StateInstruction) GetState(context process_ctrl.ProcessContext)
(statelang.State, error) {
+ //TODO implement me
+ panic("implement me")
+}
+
+func NewStateInstruction(stateMachineName string, tenantId string)
*StateInstruction {
+ return &StateInstruction{StateMachineName: stateMachineName, TenantId:
tenantId}
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_context.go
b/pkg/saga/statemachine/engine/process_ctrl/process_context.go
new file mode 100644
index 00000000..6531b1af
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/process_context.go
@@ -0,0 +1,284 @@
+package process_ctrl
+
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine"
+
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "sync"
+)
+
+type ProcessContext interface {
+ GetVariable(name string) interface{}
+
+ SetVariable(name string, value interface{})
+
+ GetVariables() map[string]interface{}
+
+ SetVariables(variables map[string]interface{})
+
+ RemoveVariable(name string) interface{}
+
+ HasVariable(name string) bool
+
+ GetInstruction() instruction.Instruction
+
+ SetInstruction(instruction instruction.Instruction)
+}
+
+type HierarchicalProcessContext interface {
+ ProcessContext
+
+ GetVariableLocally(name string) interface{}
+
+ SetVariableLocally(name string, value interface{})
+
+ GetVariablesLocally() map[string]interface{}
+
+ SetVariablesLocally(variables map[string]interface{})
+
+ RemoveVariableLocally(name string) interface{}
+
+ HasVariableLocally(name string) bool
+
+ ClearLocally()
+}
+
+type ProcessContextImpl struct {
+ parent ProcessContext
+ mu sync.RWMutex
+ mp map[string]interface{}
+ instruction instruction.Instruction
+}
+
+func (p *ProcessContextImpl) GetVariable(name string) interface{} {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ value, ok := p.mp[name]
+ if ok {
+ return value
+ }
+
+ if p.parent != nil {
+ return p.parent.GetVariable(name)
+ }
+
+ return nil
+}
+
+func (p *ProcessContextImpl) SetVariable(name string, value interface{}) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ _, ok := p.mp[name]
+ if ok {
+ p.mp[name] = value
+ } else {
+ if p.parent != nil {
+ p.parent.SetVariable(name, value)
+ } else {
+ p.mp[name] = value
+ }
+ }
+}
+
+func (p *ProcessContextImpl) GetVariables() map[string]interface{} {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ newVariablesMap := make(map[string]interface{})
+ if p.parent != nil {
+ variables := p.parent.GetVariables()
+ for k, v := range variables {
+ newVariablesMap[k] = v
+ }
+ }
+
+ for k, v := range p.mp {
+ newVariablesMap[k] = v
+ }
+
+ return newVariablesMap
+}
+
+func (p *ProcessContextImpl) SetVariables(variables map[string]interface{}) {
+ for k, v := range variables {
+ p.SetVariable(k, v)
+ }
+}
+
+func (p *ProcessContextImpl) RemoveVariable(name string) interface{} {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ value, ok := p.mp[name]
+ if ok {
+ delete(p.mp, name)
+ return value
+ }
+
+ if p.parent != nil {
+ return p.parent.RemoveVariable(name)
+ }
+
+ return nil
+}
+
+func (p *ProcessContextImpl) HasVariable(name string) bool {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ _, ok := p.mp[name]
+ if ok {
+ return true
+ }
+
+ if p.parent != nil {
+ return p.parent.HasVariable(name)
+ }
+
+ return false
+}
+
+func (p *ProcessContextImpl) GetInstruction() instruction.Instruction {
+ return p.instruction
+}
+
+func (p *ProcessContextImpl) SetInstruction(instruction
instruction.Instruction) {
+ p.instruction = instruction
+}
+
+func (p *ProcessContextImpl) GetVariableLocally(name string) interface{} {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ value, _ := p.mp[name]
+ return value
+}
+
+func (p *ProcessContextImpl) SetVariableLocally(name string, value
interface{}) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ p.mp[name] = value
+}
+
+func (p *ProcessContextImpl) GetVariablesLocally() map[string]interface{} {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ newVariablesMap := make(map[string]interface{}, len(p.mp))
+ for k, v := range p.mp {
+ newVariablesMap[k] = v
+ }
+ return newVariablesMap
+}
+
+func (p *ProcessContextImpl) SetVariablesLocally(variables
map[string]interface{}) {
+ for k, v := range variables {
+ p.SetVariableLocally(k, v)
+ }
+}
+
+func (p *ProcessContextImpl) RemoveVariableLocally(name string) interface{} {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ value, _ := p.mp[name]
+ delete(p.mp, name)
+ return value
+}
+
+func (p *ProcessContextImpl) HasVariableLocally(name string) bool {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ _, ok := p.mp[name]
+ return ok
+}
+
+func (p *ProcessContextImpl) ClearLocally() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ p.mp = map[string]interface{}{}
+}
+
+// ProcessContextBuilder process_ctrl builder
+type ProcessContextBuilder struct {
+ processContext ProcessContext
+}
+
+func NewProcessContextBuilder() *ProcessContextBuilder {
+ processContextImpl := &ProcessContextImpl{}
+ return &ProcessContextBuilder{processContextImpl}
+}
+
+func (p *ProcessContextBuilder) WithProcessType(processType ProcessType)
*ProcessContextBuilder {
+ p.processContext.SetVariable(engine.VarNameProcessType, processType)
+ return p
+}
+
+func (p *ProcessContextBuilder) WithOperationName(operationName string)
*ProcessContextBuilder {
+ p.processContext.SetVariable(engine.VarNameOperationName, operationName)
+ return p
+}
+
+func (p *ProcessContextBuilder) WithAsyncCallback(callBack engine.CallBack)
*ProcessContextBuilder {
+ if callBack != nil {
+ p.processContext.SetVariable(engine.VarNameAsyncCallback,
callBack)
+ }
+
+ return p
+}
+
+func (p *ProcessContextBuilder) WithInstruction(instruction
instruction.Instruction) *ProcessContextBuilder {
+ if instruction != nil {
+ p.processContext.SetInstruction(instruction)
+ }
+
+ return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineInstance(stateMachineInstance
statelang.StateMachineInstance) *ProcessContextBuilder {
+ if stateMachineInstance != nil {
+ p.processContext.SetVariable(engine.VarNameStateMachineInst,
stateMachineInstance)
+ p.processContext.SetVariable(engine.VarNameStateMachine,
stateMachineInstance.StateMachine())
+ }
+
+ return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineEngine(stateMachineEngine
engine.StateMachineEngine) *ProcessContextBuilder {
+ if stateMachineEngine != nil {
+ p.processContext.SetVariable(engine.VarNameStateMachineEngine,
stateMachineEngine)
+ }
+
+ return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineConfig(stateMachineConfig
engine.StateMachineConfig) *ProcessContextBuilder {
+ if stateMachineConfig != nil {
+ p.processContext.SetVariable(engine.VarNameStateMachineConfig,
stateMachineConfig)
+ }
+
+ return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineContextVariables(contextMap
map[string]interface{}) *ProcessContextBuilder {
+ if contextMap != nil {
+ p.processContext.SetVariable(engine.VarNameStateMachineContext,
contextMap)
+ }
+
+ return p
+}
+
+func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool)
*ProcessContextBuilder {
+ p.processContext.SetVariable(engine.VarNameIsAsyncExecution, async)
+
+ return p
+}
+
+func (p *ProcessContextBuilder) Build() ProcessContext {
+ return p.processContext
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_type.go
b/pkg/saga/statemachine/engine/process_ctrl/process_type.go
new file mode 100644
index 00000000..14027a28
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/process_type.go
@@ -0,0 +1,7 @@
+package process_ctrl
+
+type ProcessType string
+
+const (
+ StateLang ProcessType = "STATE_LANG" // SEATA State Language
+)
diff --git
a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
b/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
new file mode 100644
index 00000000..ce3479c3
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
@@ -0,0 +1,139 @@
+package process_ctrl
+
+import (
+ "context"
+ "github.com/pkg/errors"
+
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
+ "sync"
+)
+
+type StateHandler interface {
+ State() string
+ ProcessHandler
+}
+
+type StateRouter interface {
+ State() string
+ RouterHandler
+}
+
+type InterceptAbleStateHandler interface {
+ StateHandler
+ StateHandlerInterceptorList() []StateHandlerInterceptor
+ RegistryStateHandlerInterceptor(stateHandlerInterceptor
StateHandlerInterceptor)
+}
+
+type StateHandlerInterceptor interface {
+ PreProcess(ctx context.Context, processContext ProcessContext) error
+ PostProcess(ctx context.Context, processContext ProcessContext) error
+}
+
+type StateMachineProcessHandler struct {
+ mp map[string]StateHandler
+ mu sync.RWMutex
+}
+
+func NewStateMachineProcessHandler() *StateMachineProcessHandler {
+ return &StateMachineProcessHandler{
+ mp: make(map[string]StateHandler),
+ }
+}
+
+func (s *StateMachineProcessHandler) Process(ctx context.Context,
processContext ProcessContext) error {
+ stateInstruction, _ :=
processContext.GetInstruction().(instruction.StateInstruction)
+
+ state, err := stateInstruction.GetState(processContext)
+ if err != nil {
+ return err
+ }
+
+ stateType := state.Type()
+ stateHandler := s.GetStateHandler(stateType)
+ if stateHandler == nil {
+ return errors.New("Not support [" + stateType + "] state
handler")
+ }
+
+ interceptAbleStateHandler, ok :=
stateHandler.(InterceptAbleStateHandler)
+
+ var stateHandlerInterceptorList []StateHandlerInterceptor
+ if ok {
+ stateHandlerInterceptorList =
interceptAbleStateHandler.StateHandlerInterceptorList()
+ }
+
+ if stateHandlerInterceptorList != nil &&
len(stateHandlerInterceptorList) > 0 {
+ for _, stateHandlerInterceptor := range
stateHandlerInterceptorList {
+ err = stateHandlerInterceptor.PreProcess(ctx,
processContext)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ err = stateHandler.Process(ctx, processContext)
+ if err != nil {
+ return err
+ }
+
+ if stateHandlerInterceptorList != nil &&
len(stateHandlerInterceptorList) > 0 {
+ for _, stateHandlerInterceptor := range
stateHandlerInterceptorList {
+ err = stateHandlerInterceptor.PostProcess(ctx,
processContext)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func (s *StateMachineProcessHandler) GetStateHandler(stateType string)
StateHandler {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.mp[stateType]
+}
+
+func (s *StateMachineProcessHandler) RegistryStateHandler(stateType string,
stateHandler StateHandler) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.mp == nil {
+ s.mp = make(map[string]StateHandler)
+ }
+ s.mp[stateType] = stateHandler
+}
+
+type StateMachineRouterHandler struct {
+ mu sync.RWMutex
+ mp map[string]StateRouter
+}
+
+func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext
ProcessContext) error {
+ stateInstruction, _ :=
processContext.GetInstruction().(instruction.StateInstruction)
+
+ state, err := stateInstruction.GetState(processContext)
+ if err != nil {
+ return err
+ }
+
+ stateType := state.Type()
+ stateRouter := s.GetStateRouter(stateType)
+ if stateRouter == nil {
+ return errors.New("Not support [" + stateType + "] state
router")
+ }
+
+ return stateRouter.Route(ctx, processContext)
+}
+
+func (s *StateMachineRouterHandler) GetStateRouter(stateType string)
StateRouter {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return s.mp[stateType]
+}
+
+func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string,
stateRouter StateRouter) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.mp == nil {
+ s.mp = make(map[string]StateRouter)
+ }
+ s.mp[stateType] = stateRouter
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
new file mode 100644
index 00000000..4c115df1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
@@ -0,0 +1,124 @@
+package engine
+
+import (
+ "context"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "time"
+)
+
+type ProcessCtrlStateMachineEngine struct {
+ StateMachineConfig StateMachineConfig
+}
+
+func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context,
stateMachineName string, tenantId string, startParams map[string]interface{})
(statelang.StateMachineInstance, error) {
+ return p.startInternal(ctx, stateMachineName, tenantId, "",
startParams, false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context,
stateMachineName string, tenantId string, businessKey string, startParams
map[string]interface{}, async bool, callback CallBack)
(statelang.StateMachineInstance, error) {
+ if tenantId == "" {
+ tenantId = p.StateMachineConfig.DefaultTenantId()
+ }
+
+ stateMachineInstance, err := p.createMachineInstance(stateMachineName,
tenantId, businessKey, startParams)
+ if err != nil {
+ return nil, err
+ }
+
+ // Build the process_ctrl context.
+ processContextBuilder := process_ctrl.NewProcessContextBuilder().
+ WithProcessType(process_ctrl.StateLang).
+ WithOperationName(OperationNameStart).
+ WithAsyncCallback(callback).
+
WithInstruction(instruction.NewStateInstruction(stateMachineName, tenantId)).
+ WithStateMachineInstance(stateMachineInstance).
+ WithStateMachineConfig(p.StateMachineConfig).
+ WithStateMachineEngine(p).
+ WithIsAsyncExecution(async)
+
+ contextMap := p.copyMap(startParams)
+
+ stateMachineInstance.SetContext(contextMap)
+
+ processContext :=
processContextBuilder.WithStateMachineContextVariables(contextMap).Build()
+
+ if stateMachineInstance.StateMachine().IsPersist() &&
p.StateMachineConfig.StateLogStore() != nil {
+ err :=
p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx,
stateMachineInstance, processContext)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ if stateMachineInstance.ID() == "" {
+
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(SeqEntityStateMachineInst,
""))
+ }
+
+ var eventPublisher events.EventPublisher
+ if async {
+ eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
+ } else {
+ eventPublisher = p.StateMachineConfig.EventPublisher()
+ }
+
+ _, err = eventPublisher.PushEvent(ctx, processContext)
+ if err != nil {
+ return nil, err
+ }
+
+ return stateMachineInstance, nil
+}
+
+// copyMap not deep copy, so best practice: Don’t pass by reference
+func (p ProcessCtrlStateMachineEngine) copyMap(startParams
map[string]interface{}) map[string]interface{} {
+ copyMap := make(map[string]interface{}, len(startParams))
+ for k, v := range startParams {
+ copyMap[k] = v
+ }
+ return copyMap
+}
+
+func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName
string, tenantId string, businessKey string, startParams
map[string]interface{}) (statelang.StateMachineInstance, error) {
+ stateMachine, err :=
p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName,
tenantId)
+ if err != nil {
+ return nil, err
+ }
+
+ if stateMachine == nil {
+ return nil, errors.New("StateMachine [" + stateMachineName + "]
is not exists")
+ }
+
+ stateMachineInstance := statelang.NewStateMachineInstanceImpl()
+ stateMachineInstance.SetStateMachine(stateMachine)
+ stateMachineInstance.SetTenantID(tenantId)
+ stateMachineInstance.SetBusinessKey(businessKey)
+ stateMachineInstance.SetStartParams(startParams)
+ if startParams != nil {
+ if businessKey != "" {
+ startParams[VarNameBusinesskey] = businessKey
+ }
+
+ if startParams[VarNameParentId] != nil {
+ parentId, ok := startParams[VarNameParentId].(string)
+ if !ok {
+
+ }
+ stateMachineInstance.SetParentID(parentId)
+ delete(startParams, VarNameParentId)
+ }
+ }
+
+ stateMachineInstance.SetStatus(statelang.RU)
+ stateMachineInstance.SetRunning(true)
+
+ now := time.Now()
+ stateMachineInstance.SetStartedTime(now)
+ stateMachineInstance.SetUpdatedTime(now)
+ return stateMachineInstance, nil
+}
+
+func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig)
*ProcessCtrlStateMachineEngine {
+ return &ProcessCtrlStateMachineEngine{StateMachineConfig:
stateMachineConfig}
+}
diff --git a/pkg/saga/statemachine/engine/sequence/sequence.go
b/pkg/saga/statemachine/engine/sequence/sequence.go
new file mode 100644
index 00000000..1a7fdae7
--- /dev/null
+++ b/pkg/saga/statemachine/engine/sequence/sequence.go
@@ -0,0 +1,5 @@
+package sequence
+
+type SeqGenerator interface {
+ GenerateId(entity string, ruleName string) string
+}
diff --git a/pkg/saga/statemachine/engine/sequence/uuid.go
b/pkg/saga/statemachine/engine/sequence/uuid.go
new file mode 100644
index 00000000..f3060dcf
--- /dev/null
+++ b/pkg/saga/statemachine/engine/sequence/uuid.go
@@ -0,0 +1,14 @@
+package sequence
+
+import "github.com/google/uuid"
+
+type UUIDSeqGenerator struct {
+}
+
+func NewUUIDSeqGenerator() *UUIDSeqGenerator {
+ return &UUIDSeqGenerator{}
+}
+
+func (U UUIDSeqGenerator) GenerateId(entity string, ruleName string) string {
+ return uuid.New().String()
+}
diff --git a/pkg/saga/statemachine/engine/statemachine_config.go
b/pkg/saga/statemachine/engine/statemachine_config.go
new file mode 100644
index 00000000..3f3c530b
--- /dev/null
+++ b/pkg/saga/statemachine/engine/statemachine_config.go
@@ -0,0 +1,44 @@
+package engine
+
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
+)
+
+type StateMachineConfig interface {
+ StateLogRepository() store.StateLogRepository
+
+ StateMachineRepository() store.StateMachineRepository
+
+ StateLogStore() store.StateLogStore
+
+ StateLangStore() store.StateLangStore
+
+ ExpressionFactoryManager() expr.ExpressionFactoryManager
+
+ ExpressionResolver() expr.ExpressionResolver
+
+ SeqGenerator() sequence.SeqGenerator
+
+ StatusDecisionStrategy() status_decision.StatusDecisionStrategy
+
+ EventPublisher() events.EventPublisher
+
+ AsyncEventPublisher() events.EventPublisher
+
+ ServiceInvokerManager() invoker.ServiceInvokerManager
+
+ ScriptInvokerManager() invoker.ScriptInvokerManager
+
+ CharSet() string
+
+ DefaultTenantId() string
+
+ TransOperationTimeout() int
+
+ ServiceInvokeTimeout() int
+}
diff --git a/pkg/saga/statemachine/engine/statemachine_engine.go
b/pkg/saga/statemachine/engine/statemachine_engine.go
new file mode 100644
index 00000000..6b3954c1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/statemachine_engine.go
@@ -0,0 +1,16 @@
+package engine
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type StateMachineEngine interface {
+ Start(ctx context.Context, stateMachineName string, tenantId string,
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
+}
+
+type CallBack interface {
+ OnFinished(ctx context.Context, context process_ctrl.ProcessContext,
stateMachineInstance statelang.StateMachineInstance)
+ OnError(ctx context.Context, context process_ctrl.ProcessContext,
stateMachineInstance statelang.StateMachineInstance, err error)
+}
diff --git a/pkg/saga/statemachine/engine/status_decision/status_decision.go
b/pkg/saga/statemachine/engine/status_decision/status_decision.go
new file mode 100644
index 00000000..6def093e
--- /dev/null
+++ b/pkg/saga/statemachine/engine/status_decision/status_decision.go
@@ -0,0 +1,4 @@
+package status_decision
+
+type StatusDecisionStrategy interface {
+}
diff --git a/pkg/saga/statemachine/engine/store/statemachine_store.go
b/pkg/saga/statemachine/engine/store/statemachine_store.go
new file mode 100644
index 00000000..8959df92
--- /dev/null
+++ b/pkg/saga/statemachine/engine/store/statemachine_store.go
@@ -0,0 +1,60 @@
+package store
+
+import (
+ "context"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "io"
+)
+
+type StateLogRepository interface {
+ GetStateMachineInstance(stateMachineInstanceId string)
(statelang.StateInstance, error)
+
+ GetStateMachineInstanceByBusinessKey(businessKey string, tenantId
string) (statelang.StateInstance, error)
+
+ GetStateMachineInstanceByParentId(parentId string)
([]statelang.StateMachineInstance, error)
+
+ GetStateInstance(stateInstanceId string, stateMachineInstanceId string)
(statelang.StateInstance, error)
+
+ GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string)
([]statelang.StateInstance, error)
+}
+
+type StateLogStore interface {
+ RecordStateMachineStarted(ctx context.Context, machineInstance
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+
+ RecordStateMachineFinished(ctx context.Context, machineInstance
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+
+ RecordStateMachineRestarted(ctx context.Context, machineInstance
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+
+ RecordStateStarted(ctx context.Context, stateInstance
statelang.StateInstance, context process_ctrl.ProcessContext) error
+
+ RecordStateFinished(ctx context.Context, stateInstance
statelang.StateInstance, context process_ctrl.ProcessContext) error
+
+ GetStateMachineInstance(stateMachineInstanceId string)
(statelang.StateInstance, error)
+
+ GetStateMachineInstanceByBusinessKey(businessKey string, tenantId
string) (statelang.StateInstance, error)
+
+ GetStateMachineInstanceByParentId(parentId string)
([]statelang.StateMachineInstance, error)
+
+ GetStateInstance(stateInstanceId string, stateMachineInstanceId string)
(statelang.StateInstance, error)
+
+ GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string)
([]statelang.StateInstance, error)
+}
+
+type StateMachineRepository interface {
+ GetStateMachineById(stateMachineId string) (statelang.StateMachine,
error)
+
+ GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error)
+
+ RegistryStateMachine(statelang.StateMachine) error
+
+ RegistryStateMachineByReader(reader io.Reader) error
+}
+
+type StateLangStore interface {
+ GetStateMachineById(stateMachineId string) (statelang.StateMachine,
error)
+
+ GetLastVersionStateMachine(stateMachineName string, tenantId string)
(statelang.StateMachine, error)
+
+ StoreStateMachine(stateMachine statelang.StateMachine) error
+}
diff --git a/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go
b/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go
new file mode 100644
index 00000000..04729c57
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go
@@ -0,0 +1,72 @@
+package parser
+
+import (
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/engine"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+)
+
+type ChoiceStateParser struct {
+ BaseStateParser
+}
+
+func NewChoiceStateParser() *ChoiceStateParser {
+ return &ChoiceStateParser{}
+}
+
+func (c ChoiceStateParser) StateType() string {
+ return engine.StateTypeChoice
+}
+
+func (c ChoiceStateParser) Parse(stateName string, stateMap
map[string]interface{}) (statelang.State, error) {
+ choiceState := state.NewChoiceStateImpl()
+ choiceState.SetName(stateName)
+
+ //parse Type
+ typeName, err := c.GetString(stateName, stateMap, "Type")
+ if err != nil {
+ return nil, err
+ }
+ choiceState.SetType(typeName)
+
+ //parse Default
+ defaultChoice, err := c.GetString(stateName, stateMap, "Default")
+ if err != nil {
+ return nil, err
+ }
+ choiceState.SetDefault(defaultChoice)
+
+ //parse Choices
+ slice, err := c.GetSlice(stateName, stateMap, "Choices")
+ if err != nil {
+ return nil, err
+ }
+
+ var choices []state.Choice
+ for i := range slice {
+ choiceValMap, ok := slice[i].(map[string]interface{})
+ if !ok {
+ return nil, errors.New(fmt.Sprintf("State [%s] Choices
element required struct", stateName))
+ }
+
+ choice := state.NewChoiceImpl()
+ expression, err := c.GetString(stateName, choiceValMap,
"Expression")
+ if err != nil {
+ return nil, err
+ }
+ choice.SetExpression(expression)
+
+ next, err := c.GetString(stateName, choiceValMap, "Next")
+ if err != nil {
+ return nil, err
+ }
+ choice.SetNext(next)
+
+ choices = append(choices, choice)
+ }
+ choiceState.SetChoices(choices)
+
+ return choiceState, nil
+}
diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go
b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go
new file mode 100644
index 00000000..c0977034
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go
@@ -0,0 +1,100 @@
+package parser
+
+import (
+ "encoding/json"
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type JSONStateMachineParser struct {
+}
+
+func NewJSONStateMachineParser() *JSONStateMachineParser {
+ return &JSONStateMachineParser{}
+}
+
+func (stateMachineParser JSONStateMachineParser) GetType() string {
+ return "JSON"
+}
+
+func (stateMachineParser JSONStateMachineParser) Parse(content string)
(statelang.StateMachine, error) {
+ var stateMachineJsonObject StateMachineJsonObject
+
+ err := json.Unmarshal([]byte(content), &stateMachineJsonObject)
+ if err != nil {
+ return nil, err
+ }
+
+ stateMachine := statelang.NewStateMachineImpl()
+ stateMachine.SetName(stateMachineJsonObject.Name)
+ stateMachine.SetComment(stateMachineJsonObject.Comment)
+ stateMachine.SetVersion(stateMachineJsonObject.Version)
+ stateMachine.SetStartState(stateMachineJsonObject.StartState)
+ stateMachine.SetPersist(stateMachineJsonObject.Persist)
+
+ if stateMachineJsonObject.Type != "" {
+ stateMachine.SetType(stateMachineJsonObject.Type)
+ }
+
+ if stateMachineJsonObject.RecoverStrategy != "" {
+ recoverStrategy, ok :=
statelang.ValueOfRecoverStrategy(stateMachineJsonObject.RecoverStrategy)
+ if !ok {
+ return nil, errors.New("Not support " +
stateMachineJsonObject.RecoverStrategy)
+ }
+ stateMachine.SetRecoverStrategy(recoverStrategy)
+ }
+
+ stateParserFactory := NewDefaultStateParserFactory()
+ stateParserFactory.InitDefaultStateParser()
+ for stateName, v := range stateMachineJsonObject.States {
+ stateMap, ok := v.(map[string]interface{})
+ if !ok {
+ return nil, errors.New("State [" + stateName + "]
scheme illegal, required map")
+ }
+
+ stateType, ok := stateMap["Type"].(string)
+ if !ok {
+ return nil, errors.New("State [" + stateName + "] Type
illegal, required string")
+ }
+
+ //stateMap
+ stateParser := stateParserFactory.GetStateParser(stateType)
+ if stateParser == nil {
+ return nil, errors.New("State Type [" + stateType + "]
is not support")
+ }
+
+ _, stateExist := stateMachine.States()[stateName]
+ if stateExist {
+ return nil, errors.New("State [name:" + stateName + "]
already exists")
+ }
+
+ state, err := stateParser.Parse(stateName, stateMap)
+ if err != nil {
+ return nil, err
+ }
+
+ state.SetStateMachine(stateMachine)
+ stateMachine.States()[stateName] = state
+ }
+
+ //TODO setCompensateState
+ //for stateName, state := range stateMachine.GetStates() {
+ //
+ //}
+ //
+
+ return stateMachine, nil
+}
+
+type StateMachineJsonObject struct {
+ Name string `json:"Name"`
+ Comment string `json:"Comment"`
+ Version string `json:"Version"`
+ StartState string `json:"StartState"`
+ RecoverStrategy string
`json:"RecoverStrategy"`
+ Persist bool `json:"IsPersist"`
+ RetryPersistModeUpdate bool
`json:"IsRetryPersistModeUpdate"`
+ CompensatePersistModeUpdate bool
`json:"IsCompensatePersistModeUpdate"`
+ Type string `json:"Type"`
+ States map[string]interface{} `json:"States"`
+}
diff --git
a/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go
b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go
new file mode 100644
index 00000000..c6d00b75
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go
@@ -0,0 +1,13 @@
+package parser
+
+import (
+ "testing"
+)
+
+func TestParseChoice(t *testing.T) {
+ var content = "{\n \"Name\":\"ChoiceTest\",\n
\"Comment\":\"ChoiceTest\",\n \"StartState\":\"ChoiceState\",\n
\"Version\":\"0.0.1\",\n \"States\":{\n \"ChoiceState\":{\n
\"Type\":\"Choice\",\n \"Choices\":[\n {\n
\"Expression\":\"[a] == 1\",\n
\"Next\":\"SecondState\"\n },\n {\n
\"Expression\":\"[a] == 2\",\n \"Next\":\"Thir [...]
+ _, err := NewJSONStateMachineParser().Parse(content)
+ if err != nil {
+ t.Error("parse fail: " + err.Error())
+ }
+}
diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_parser.go
b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go
new file mode 100644
index 00000000..99ee95e1
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go
@@ -0,0 +1,104 @@
+package parser
+
+import (
+ "github.com/pkg/errors"
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+ "sync"
+)
+
+type StateMachineParser interface {
+ GetType() string
+ Parse(content string) (statelang.StateMachine, error)
+}
+
+type StateParser interface {
+ StateType() string
+ Parse(stateName string, stateMap map[string]interface{})
(statelang.State, error)
+}
+
+type BaseStateParser struct {
+}
+
+func (b BaseStateParser) ParseBaseAttributes(stateName string, state
statelang.State, stateMap map[string]interface{}) error {
+ state.SetName(stateName)
+
+ comment, err := b.GetString(stateName, stateMap, "Comment")
+ if err != nil {
+ return err
+ }
+ state.SetComment(comment)
+
+ next, err := b.GetString(stateName, stateMap, "Next")
+ if err != nil {
+ return err
+ }
+
+ state.SetNext(next)
+ return nil
+}
+
+func (b BaseStateParser) GetString(stateName string, stateMap
map[string]interface{}, key string) (string, error) {
+ value := stateMap[key]
+ if value == nil {
+ var result string
+ return result, errors.New("State [" + stateName + "] " + key +
" not exist")
+ }
+
+ valueAsString, ok := value.(string)
+ if !ok {
+ var s string
+ return s, errors.New("State [" + stateName + "] " + key + "
illegal, required string")
+ }
+ return valueAsString, nil
+}
+
+func (b BaseStateParser) GetSlice(stateName string, stateMap
map[string]interface{}, key string) ([]interface{}, error) {
+ value := stateMap[key]
+
+ if value == nil {
+ var result []interface{}
+ return result, errors.New("State [" + stateName + "] " + key +
" not exist")
+ }
+
+ valueAsSlice, ok := value.([]interface{})
+ if !ok {
+ var result []interface{}
+ return result, errors.New("State [" + stateName + "] " + key +
" illegal, required slice")
+ }
+ return valueAsSlice, nil
+}
+
+type StateParserFactory interface {
+ RegistryStateParser(stateType string, stateParser StateParser)
+
+ GetStateParser(stateType string) StateParser
+}
+
+type DefaultStateParserFactory struct {
+ stateParserMap map[string]StateParser
+ mutex sync.Mutex
+}
+
+func NewDefaultStateParserFactory() *DefaultStateParserFactory {
+ var stateParserMap map[string]StateParser = make(map[string]StateParser)
+ return &DefaultStateParserFactory{
+ stateParserMap: stateParserMap,
+ }
+}
+
+// InitDefaultStateParser init StateParser by default
+func (d *DefaultStateParserFactory) InitDefaultStateParser() {
+ choiceStateParser := NewChoiceStateParser()
+
+ d.RegistryStateParser(choiceStateParser.StateType(), choiceStateParser)
+}
+
+func (d *DefaultStateParserFactory) RegistryStateParser(stateType string,
stateParser StateParser) {
+ d.mutex.Lock()
+ defer d.mutex.Unlock()
+ d.stateParserMap[stateType] = stateParser
+}
+
+func (d *DefaultStateParserFactory) GetStateParser(stateType string)
StateParser {
+ return d.stateParserMap[stateType]
+}
diff --git a/pkg/saga/statemachine/statelang/state.go
b/pkg/saga/statemachine/statelang/state.go
new file mode 100644
index 00000000..1e490ac0
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state.go
@@ -0,0 +1,71 @@
+package statelang
+
+type State interface {
+ Name() string
+
+ SetName(name string)
+
+ Comment() string
+
+ SetComment(comment string)
+
+ Type() string
+
+ SetType(typeName string)
+
+ Next() string
+
+ SetNext(next string)
+
+ StateMachine() StateMachine
+
+ SetStateMachine(machine StateMachine)
+}
+
+type BaseState struct {
+ name string `alias:"Name"`
+ comment string `alias:"Comment"`
+ typeName string `alias:"Type"`
+ next string `alias:"Next"`
+ stateMachine StateMachine
+}
+
+func (b *BaseState) Name() string {
+ return b.name
+}
+
+func (b *BaseState) SetName(name string) {
+ b.name = name
+}
+
+func (b *BaseState) Comment() string {
+ return b.comment
+}
+
+func (b *BaseState) SetComment(comment string) {
+ b.comment = comment
+}
+
+func (b *BaseState) Type() string {
+ return b.typeName
+}
+
+func (b *BaseState) SetType(typeName string) {
+ b.typeName = typeName
+}
+
+func (b *BaseState) Next() string {
+ return b.next
+}
+
+func (b *BaseState) SetNext(next string) {
+ b.next = next
+}
+
+func (b *BaseState) StateMachine() StateMachine {
+ return b.stateMachine
+}
+
+func (b *BaseState) SetStateMachine(machine StateMachine) {
+ b.stateMachine = machine
+}
diff --git a/pkg/saga/statemachine/statelang/state/choice_state.go
b/pkg/saga/statemachine/statelang/state/choice_state.go
new file mode 100644
index 00000000..e1cd973c
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state/choice_state.go
@@ -0,0 +1,74 @@
+package state
+
+import "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+
+type ChoiceState interface {
+ statelang.State
+
+ Choices() []Choice
+
+ Default() string
+}
+
+type Choice interface {
+ Expression() string
+
+ SetExpression(expression string)
+
+ Next() string
+
+ SetNext(next string)
+}
+
+type ChoiceStateImpl struct {
+ statelang.BaseState
+ defaultChoice string `alias:"Default"`
+ choices []Choice `alias:"Choices"`
+}
+
+func NewChoiceStateImpl() *ChoiceStateImpl {
+ return &ChoiceStateImpl{
+ choices: make([]Choice, 0),
+ }
+}
+
+func (choiceState *ChoiceStateImpl) Default() string {
+ return choiceState.defaultChoice
+}
+
+func (choiceState *ChoiceStateImpl) Choices() []Choice {
+ return choiceState.choices
+}
+
+func (choiceState *ChoiceStateImpl) SetDefault(defaultChoice string) {
+ choiceState.defaultChoice = defaultChoice
+}
+
+func (choiceState *ChoiceStateImpl) SetChoices(choices []Choice) {
+ choiceState.choices = choices
+}
+
+type ChoiceImpl struct {
+ expression string
+ next string
+}
+
+func NewChoiceImpl() *ChoiceImpl {
+ return &ChoiceImpl{}
+}
+
+func (c *ChoiceImpl) Expression() string {
+ return c.expression
+}
+
+func (c *ChoiceImpl) SetExpression(expression string) {
+ c.expression = expression
+}
+
+func (c *ChoiceImpl) Next() string {
+ return c.next
+}
+
+func (c *ChoiceImpl) SetNext(next string) {
+ c.next = next
+}
diff --git a/pkg/saga/statemachine/statelang/state/task_state.go
b/pkg/saga/statemachine/statelang/state/task_state.go
new file mode 100644
index 00000000..a9164d35
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state/task_state.go
@@ -0,0 +1,30 @@
+package state
+
+import (
+ "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type TaskState interface {
+ statelang.State
+
+ CompensateState() string
+
+ Status() map[string]string
+
+ Retry() []Retry
+}
+
+type Retry interface {
+ ErrorTypeNames() []string
+
+ IntervalSecond() float64
+
+ MaxAttempt() int
+
+ BackoffRate() float64
+}
+
+type ServiceTaskState interface {
+ TaskState
+ //TODO add serviceTask
+}
diff --git a/pkg/saga/statemachine/statelang/state_instance.go
b/pkg/saga/statemachine/statelang/state_instance.go
new file mode 100644
index 00000000..826faca6
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state_instance.go
@@ -0,0 +1,330 @@
+package statelang
+
+import "time"
+
+type StateInstance interface {
+ ID() string
+
+ SetID(id string)
+
+ Name() string
+
+ SetName(name string)
+
+ Type() string
+
+ SetType(typeName string)
+
+ ServiceName() string
+
+ SetServiceName(serviceName string)
+
+ ServiceMethod() string
+
+ SetServiceMethod(serviceMethod string)
+
+ ServiceType() string
+
+ SetServiceType(serviceType string)
+
+ BusinessKey() string
+
+ SetBusinessKey(businessKey string)
+
+ StartedTime() time.Time
+
+ SetStartedTime(startedTime time.Time)
+
+ UpdatedTime() time.Time
+
+ SetUpdatedTime(updateTime time.Time)
+
+ EndTime() time.Time
+
+ SetEndTime(endTime time.Time)
+
+ IsForUpdate() bool
+
+ SetForUpdate(forUpdate bool)
+
+ Error() error
+
+ SetError(err error)
+
+ InputParams() interface{}
+
+ SetInputParams(inputParams interface{})
+
+ OutputParams() interface{}
+
+ SetOutputParams(outputParams interface{})
+
+ Status() ExecutionStatus
+
+ SetStatus(status ExecutionStatus)
+
+ StateIDCompensatedFor() string
+
+ SetStateIDCompensatedFor(stateIdCompensatedFor string)
+
+ StateIDRetriedFor() string
+
+ SetStateIDRetriedFor(stateIdRetriedFor string)
+
+ CompensationState() StateInstance
+
+ SetCompensationState(compensationState StateInstance)
+
+ StateMachineInstance() StateMachineInstance
+
+ SetStateMachineInstance(stateMachineInstance StateMachineInstance)
+
+ IsIgnoreStatus() bool
+
+ SetIgnoreStatus(ignoreStatus bool)
+
+ IsForCompensation() bool
+
+ SerializedInputParams() interface{}
+
+ SetSerializedInputParams(serializedInputParams interface{})
+
+ SerializedOutputParams() interface{}
+
+ SetSerializedOutputParams(serializedOutputParams interface{})
+
+ SerializedError() interface{}
+
+ SetSerializedError(serializedErr interface{})
+
+ CompensationStatus() ExecutionStatus
+}
+
+type StateInstanceImpl struct {
+ id string
+ machineInstanceId string
+ name string
+ typeName string
+ serviceName string
+ serviceMethod string
+ serviceType string
+ businessKey string
+ startedTime time.Time
+ updatedTime time.Time
+ endTime time.Time
+ isForUpdate bool
+ err error
+ serializedErr interface{}
+ inputParams interface{}
+ serializedInputParams interface{}
+ outputParams interface{}
+ serializedOutputParams interface{}
+ status ExecutionStatus
+ stateIdCompensatedFor string
+ stateIdRetriedFor string
+ compensationState StateInstance
+ stateMachineInstance StateMachineInstance
+ ignoreStatus bool
+}
+
+func NewStateInstanceImpl() *StateInstanceImpl {
+ return &StateInstanceImpl{}
+}
+
+func (s *StateInstanceImpl) ID() string {
+ return s.id
+}
+
+func (s *StateInstanceImpl) SetID(id string) {
+ s.id = id
+}
+
+func (s *StateInstanceImpl) Name() string {
+ return s.name
+}
+
+func (s *StateInstanceImpl) SetName(name string) {
+ s.name = name
+}
+
+func (s *StateInstanceImpl) Type() string {
+ return s.typeName
+}
+
+func (s *StateInstanceImpl) SetType(typeName string) {
+ s.typeName = typeName
+}
+
+func (s *StateInstanceImpl) ServiceName() string {
+ return s.serviceName
+}
+
+func (s *StateInstanceImpl) SetServiceName(serviceName string) {
+ s.serviceName = serviceName
+}
+
+func (s *StateInstanceImpl) ServiceMethod() string {
+ return s.serviceMethod
+}
+
+func (s *StateInstanceImpl) SetServiceMethod(serviceMethod string) {
+ s.serviceMethod = serviceMethod
+}
+
+func (s *StateInstanceImpl) ServiceType() string {
+ return s.serviceType
+}
+
+func (s *StateInstanceImpl) SetServiceType(serviceType string) {
+ s.serviceType = serviceType
+}
+
+func (s *StateInstanceImpl) BusinessKey() string {
+ return s.businessKey
+}
+
+func (s *StateInstanceImpl) SetBusinessKey(businessKey string) {
+ s.businessKey = businessKey
+}
+
+func (s *StateInstanceImpl) StartedTime() time.Time {
+ return s.startedTime
+}
+
+func (s *StateInstanceImpl) SetStartedTime(startedTime time.Time) {
+ s.startedTime = startedTime
+}
+
+func (s *StateInstanceImpl) UpdatedTime() time.Time {
+ return s.updatedTime
+}
+
+func (s *StateInstanceImpl) SetUpdatedTime(updatedTime time.Time) {
+ s.updatedTime = updatedTime
+}
+
+func (s *StateInstanceImpl) EndTime() time.Time {
+ return s.endTime
+}
+
+func (s *StateInstanceImpl) SetEndTime(endTime time.Time) {
+ s.endTime = endTime
+}
+
+func (s *StateInstanceImpl) IsForUpdate() bool {
+ return s.isForUpdate
+}
+
+func (s *StateInstanceImpl) SetForUpdate(forUpdate bool) {
+ s.isForUpdate = forUpdate
+}
+
+func (s *StateInstanceImpl) Error() error {
+ return s.err
+}
+
+func (s *StateInstanceImpl) SetError(err error) {
+ s.err = err
+}
+
+func (s *StateInstanceImpl) InputParams() interface{} {
+ return s.inputParams
+}
+
+func (s *StateInstanceImpl) SetInputParams(inputParams interface{}) {
+ s.inputParams = inputParams
+}
+
+func (s *StateInstanceImpl) OutputParams() interface{} {
+ return s.outputParams
+}
+
+func (s *StateInstanceImpl) SetOutputParams(outputParams interface{}) {
+ s.outputParams = outputParams
+}
+
+func (s *StateInstanceImpl) Status() ExecutionStatus {
+ return s.status
+}
+
+func (s *StateInstanceImpl) SetStatus(status ExecutionStatus) {
+ s.status = status
+}
+
+func (s *StateInstanceImpl) StateIDCompensatedFor() string {
+ return s.stateIdCompensatedFor
+}
+
+func (s *StateInstanceImpl) SetStateIDCompensatedFor(stateIdCompensatedFor
string) {
+ s.stateIdCompensatedFor = stateIdCompensatedFor
+}
+
+func (s *StateInstanceImpl) StateIDRetriedFor() string {
+ return s.stateIdRetriedFor
+}
+
+func (s *StateInstanceImpl) SetStateIDRetriedFor(stateIdRetriedFor string) {
+ s.stateIdRetriedFor = stateIdRetriedFor
+}
+
+func (s *StateInstanceImpl) CompensationState() StateInstance {
+ return s.compensationState
+}
+
+func (s *StateInstanceImpl) SetCompensationState(compensationState
StateInstance) {
+ s.compensationState = compensationState
+}
+
+func (s *StateInstanceImpl) StateMachineInstance() StateMachineInstance {
+ return s.stateMachineInstance
+}
+
+func (s *StateInstanceImpl) SetStateMachineInstance(stateMachineInstance
StateMachineInstance) {
+ s.stateMachineInstance = stateMachineInstance
+}
+
+func (s *StateInstanceImpl) IsIgnoreStatus() bool {
+ return s.ignoreStatus
+}
+
+func (s *StateInstanceImpl) SetIgnoreStatus(ignoreStatus bool) {
+ s.ignoreStatus = ignoreStatus
+}
+
+func (s *StateInstanceImpl) IsForCompensation() bool {
+ return s.stateIdCompensatedFor == ""
+}
+
+func (s *StateInstanceImpl) SerializedInputParams() interface{} {
+ return s.serializedInputParams
+}
+
+func (s *StateInstanceImpl) SetSerializedInputParams(serializedInputParams
interface{}) {
+ s.serializedInputParams = serializedInputParams
+}
+
+func (s *StateInstanceImpl) SerializedOutputParams() interface{} {
+ return s.serializedOutputParams
+}
+
+func (s *StateInstanceImpl) SetSerializedOutputParams(serializedOutputParams
interface{}) {
+ s.serializedOutputParams = serializedOutputParams
+}
+
+func (s *StateInstanceImpl) SerializedError() interface{} {
+ return s.serializedErr
+}
+
+func (s *StateInstanceImpl) SetSerializedError(serializedErr interface{}) {
+ s.serializedErr = serializedErr
+}
+
+func (s *StateInstanceImpl) CompensationStatus() ExecutionStatus {
+ if s.compensationState != nil {
+ return s.compensationState.Status()
+ }
+
+ //return nil ExecutionStatus
+ var status ExecutionStatus
+ return status
+}
diff --git a/pkg/saga/statemachine/statelang/statemachine.go
b/pkg/saga/statemachine/statelang/statemachine.go
new file mode 100644
index 00000000..8b13f8d0
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/statemachine.go
@@ -0,0 +1,261 @@
+package statelang
+
+import (
+ "time"
+)
+
+type StateMachineStatus string
+
+const (
+ Active StateMachineStatus = "Active"
+ Inactive StateMachineStatus = "Inactive"
+)
+
+// RecoverStrategy : Recover Strategy
+type RecoverStrategy string
+
+const (
+ //Compensate stateMachine
+ Compensate RecoverStrategy = "Compensate"
+ // Forward stateMachine
+ Forward RecoverStrategy = "Forward"
+)
+
+func ValueOfRecoverStrategy(recoverStrategy string) (RecoverStrategy, bool) {
+ switch recoverStrategy {
+ case "Compensate":
+ return Compensate, true
+ case "Forward":
+ return Forward, true
+ default:
+ var recoverStrategy RecoverStrategy
+ return recoverStrategy, false
+ }
+}
+
+type StateMachine interface {
+ ID() string
+
+ SetID(id string)
+
+ Name() string
+
+ SetName(name string)
+
+ Comment() string
+
+ SetComment(comment string)
+
+ StartState() string
+
+ SetStartState(startState string)
+
+ Version() string
+
+ SetVersion(version string)
+
+ States() map[string]State
+
+ State(stateName string) State
+
+ TenantId() string
+
+ SetTenantId(tenantId string)
+
+ AppName() string
+
+ SetAppName(appName string)
+
+ Type() string
+
+ SetType(typeName string)
+
+ Status() StateMachineStatus
+
+ SetStatus(status StateMachineStatus)
+
+ RecoverStrategy() RecoverStrategy
+
+ SetRecoverStrategy(recoverStrategy RecoverStrategy)
+
+ IsPersist() bool
+
+ SetPersist(persist bool)
+
+ IsRetryPersistModeUpdate() bool
+
+ SetRetryPersistModeUpdate(retryPersistModeUpdate bool)
+
+ IsCompensatePersistModeUpdate() bool
+
+ SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool)
+
+ Content() string
+
+ SetContent(content string)
+
+ CreateTime() time.Time
+
+ SetCreateTime(createTime time.Time)
+}
+
+type StateMachineImpl struct {
+ id string
+ tenantId string
+ appName string
+ name string
+ comment string
+ version string
+ startState string
+ status StateMachineStatus
+ recoverStrategy RecoverStrategy
+ persist bool
+ retryPersistModeUpdate bool
+ compensatePersistModeUpdate bool
+ typeName string
+ content string
+ createTime time.Time
+ states map[string]State
+}
+
+func NewStateMachineImpl() *StateMachineImpl {
+ stateMap := make(map[string]State)
+ return &StateMachineImpl{
+ appName: "SEATA",
+ status: Active,
+ typeName: "STATE_LANG",
+ states: stateMap,
+ }
+}
+
+func (s *StateMachineImpl) ID() string {
+ return s.id
+}
+
+func (s *StateMachineImpl) SetID(id string) {
+ s.id = id
+}
+
+func (s *StateMachineImpl) Name() string {
+ return s.name
+}
+
+func (s *StateMachineImpl) SetName(name string) {
+ s.name = name
+}
+
+func (s *StateMachineImpl) SetComment(comment string) {
+ s.comment = comment
+}
+
+func (s *StateMachineImpl) Comment() string {
+ return s.comment
+}
+
+func (s *StateMachineImpl) StartState() string {
+ return s.startState
+}
+
+func (s *StateMachineImpl) SetStartState(startState string) {
+ s.startState = startState
+}
+
+func (s *StateMachineImpl) Version() string {
+ return s.version
+}
+
+func (s *StateMachineImpl) SetVersion(version string) {
+ s.version = version
+}
+
+func (s *StateMachineImpl) States() map[string]State {
+ return s.states
+}
+
+func (s *StateMachineImpl) State(stateName string) State {
+ if s.states == nil {
+ return nil
+ }
+
+ return s.states[stateName]
+}
+
+func (s *StateMachineImpl) TenantId() string {
+ return s.tenantId
+}
+
+func (s *StateMachineImpl) SetTenantId(tenantId string) {
+ s.tenantId = tenantId
+}
+
+func (s *StateMachineImpl) AppName() string {
+ return s.appName
+}
+
+func (s *StateMachineImpl) SetAppName(appName string) {
+ s.appName = appName
+}
+
+func (s *StateMachineImpl) Type() string {
+ return s.typeName
+}
+
+func (s *StateMachineImpl) SetType(typeName string) {
+ s.typeName = typeName
+}
+
+func (s *StateMachineImpl) Status() StateMachineStatus {
+ return s.status
+}
+
+func (s *StateMachineImpl) SetStatus(status StateMachineStatus) {
+ s.status = status
+}
+
+func (s *StateMachineImpl) RecoverStrategy() RecoverStrategy {
+ return s.recoverStrategy
+}
+
+func (s *StateMachineImpl) SetRecoverStrategy(recoverStrategy RecoverStrategy)
{
+ s.recoverStrategy = recoverStrategy
+}
+
+func (s *StateMachineImpl) IsPersist() bool {
+ return s.persist
+}
+
+func (s *StateMachineImpl) SetPersist(persist bool) {
+ s.persist = persist
+}
+
+func (s *StateMachineImpl) IsRetryPersistModeUpdate() bool {
+ return s.retryPersistModeUpdate
+}
+
+func (s *StateMachineImpl) SetRetryPersistModeUpdate(retryPersistModeUpdate
bool) {
+ s.retryPersistModeUpdate = retryPersistModeUpdate
+}
+
+func (s *StateMachineImpl) IsCompensatePersistModeUpdate() bool {
+ return s.compensatePersistModeUpdate
+}
+
+func (s *StateMachineImpl)
SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool) {
+ s.compensatePersistModeUpdate = compensatePersistModeUpdate
+}
+
+func (s *StateMachineImpl) Content() string {
+ return s.content
+}
+
+func (s *StateMachineImpl) SetContent(content string) {
+ s.content = content
+}
+
+func (s *StateMachineImpl) CreateTime() time.Time {
+ return s.createTime
+}
+
+func (s *StateMachineImpl) SetCreateTime(createTime time.Time) {
+ s.createTime = createTime
+}
diff --git a/pkg/saga/statemachine/statelang/statemachine_instance.go
b/pkg/saga/statemachine/statelang/statemachine_instance.go
new file mode 100644
index 00000000..ff039024
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/statemachine_instance.go
@@ -0,0 +1,316 @@
+package statelang
+
+import (
+ "sync"
+ "time"
+)
+
+type ExecutionStatus string
+
+const (
+ // RU Running
+ RU ExecutionStatus = "RU"
+ // SU Succeed
+ SU ExecutionStatus = "SU"
+ // FA Failed
+ FA ExecutionStatus = "FA"
+ // UN Unknown
+ UN ExecutionStatus = "UN"
+ // SK Skipped
+ SK ExecutionStatus = "SK"
+)
+
+type StateMachineInstance interface {
+ ID() string
+
+ SetID(id string)
+
+ MachineID() string
+
+ SetMachineID(machineID string)
+
+ TenantID() string
+
+ SetTenantID(tenantID string)
+
+ ParentID() string
+
+ SetParentID(parentID string)
+
+ StartedTime() time.Time
+
+ SetStartedTime(startedTime time.Time)
+
+ EndTime() time.Time
+
+ SetEndTime(endTime time.Time)
+
+ StateList() []StateInstance
+
+ State(stateId string) StateInstance
+
+ PutState(stateId string, stateInstance StateInstance)
+
+ Status() ExecutionStatus
+
+ SetStatus(status ExecutionStatus)
+
+ CompensationStatus() ExecutionStatus
+
+ SetCompensationStatus(compensationStatus ExecutionStatus)
+
+ IsRunning() bool
+
+ SetRunning(isRunning bool)
+
+ UpdatedTime() time.Time
+
+ SetUpdatedTime(updatedTime time.Time)
+
+ BusinessKey() string
+
+ SetBusinessKey(businessKey string)
+
+ Error() error
+
+ SetError(err error)
+
+ StartParams() map[string]interface{}
+
+ SetStartParams(startParams map[string]interface{})
+
+ EndParams() map[string]interface{}
+
+ SetEndParams(endParams map[string]interface{})
+
+ PutContext(key string, value interface{})
+
+ SetContext(context map[string]interface{})
+
+ StateMachine() StateMachine
+
+ SetStateMachine(stateMachine StateMachine)
+
+ SerializedStartParams() interface{}
+
+ SetSerializedStartParams(serializedStartParams interface{})
+
+ SerializedEndParams() interface{}
+
+ SetSerializedEndParams(serializedEndParams interface{})
+
+ SerializedError() interface{}
+
+ SetSerializedError(serializedError interface{})
+}
+
+type StateMachineInstanceImpl struct {
+ id string
+ machineId string
+ tenantId string
+ parentId string
+ businessKey string
+ startParams map[string]interface{}
+ serializedStartParams interface{}
+ startedTime time.Time
+ endTime time.Time
+ updatedTime time.Time
+ err error
+ serializedError interface{}
+ endParams map[string]interface{}
+ serializedEndParams interface{}
+ status ExecutionStatus
+ compensationStatus ExecutionStatus
+ isRunning bool
+ context map[string]interface{}
+ stateMachine StateMachine
+ stateList []StateInstance
+ stateMap map[string]StateInstance
+
+ contextMutex sync.RWMutex // Mutex to protect concurrent access to
context
+ stateMutex sync.RWMutex // Mutex to protect concurrent access to
stateList and stateMap
+}
+
+func NewStateMachineInstanceImpl() *StateMachineInstanceImpl {
+ return &StateMachineInstanceImpl{
+ startParams: make(map[string]interface{}),
+ endParams: make(map[string]interface{}),
+ stateList: make([]StateInstance, 0),
+ stateMap: make(map[string]StateInstance)}
+}
+
+func (s *StateMachineInstanceImpl) ID() string {
+ return s.id
+}
+
+func (s *StateMachineInstanceImpl) SetID(id string) {
+ s.id = id
+}
+
+func (s *StateMachineInstanceImpl) MachineID() string {
+ return s.machineId
+}
+
+func (s *StateMachineInstanceImpl) SetMachineID(machineID string) {
+ s.machineId = machineID
+}
+
+func (s *StateMachineInstanceImpl) TenantID() string {
+ return s.tenantId
+}
+
+func (s *StateMachineInstanceImpl) SetTenantID(tenantID string) {
+ s.tenantId = tenantID
+}
+
+func (s *StateMachineInstanceImpl) ParentID() string {
+ return s.parentId
+}
+
+func (s *StateMachineInstanceImpl) SetParentID(parentID string) {
+ s.parentId = parentID
+}
+
+func (s *StateMachineInstanceImpl) StartedTime() time.Time {
+ return s.startedTime
+}
+
+func (s *StateMachineInstanceImpl) SetStartedTime(startedTime time.Time) {
+ s.startedTime = startedTime
+}
+
+func (s *StateMachineInstanceImpl) EndTime() time.Time {
+ return s.endTime
+}
+
+func (s *StateMachineInstanceImpl) SetEndTime(endTime time.Time) {
+ s.endTime = endTime
+}
+
+func (s *StateMachineInstanceImpl) StateList() []StateInstance {
+ return s.stateList
+}
+
+func (s *StateMachineInstanceImpl) State(stateId string) StateInstance {
+ s.stateMutex.RLock()
+ defer s.stateMutex.RUnlock()
+
+ return s.stateMap[stateId]
+}
+
+func (s *StateMachineInstanceImpl) PutState(stateId string, stateInstance
StateInstance) {
+ s.stateMutex.Lock()
+ defer s.stateMutex.Unlock()
+
+ stateInstance.SetStateMachineInstance(s)
+ s.stateMap[stateId] = stateInstance
+ s.stateList = append(s.stateList, stateInstance)
+}
+
+func (s *StateMachineInstanceImpl) Status() ExecutionStatus {
+ return s.status
+}
+
+func (s *StateMachineInstanceImpl) SetStatus(status ExecutionStatus) {
+ s.status = status
+}
+
+func (s *StateMachineInstanceImpl) CompensationStatus() ExecutionStatus {
+ return s.compensationStatus
+}
+
+func (s *StateMachineInstanceImpl) SetCompensationStatus(compensationStatus
ExecutionStatus) {
+ s.compensationStatus = compensationStatus
+}
+
+func (s *StateMachineInstanceImpl) IsRunning() bool {
+ return s.isRunning
+}
+
+func (s *StateMachineInstanceImpl) SetRunning(isRunning bool) {
+ s.isRunning = isRunning
+}
+
+func (s *StateMachineInstanceImpl) UpdatedTime() time.Time {
+ return s.updatedTime
+}
+
+func (s *StateMachineInstanceImpl) SetUpdatedTime(updatedTime time.Time) {
+ s.updatedTime = updatedTime
+}
+
+func (s *StateMachineInstanceImpl) BusinessKey() string {
+ return s.businessKey
+}
+
+func (s *StateMachineInstanceImpl) SetBusinessKey(businessKey string) {
+ s.businessKey = businessKey
+}
+
+func (s *StateMachineInstanceImpl) Error() error {
+ return s.err
+}
+
+func (s *StateMachineInstanceImpl) SetError(err error) {
+ s.err = err
+}
+
+func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} {
+ return s.startParams
+}
+
+func (s *StateMachineInstanceImpl) SetStartParams(startParams
map[string]interface{}) {
+ s.startParams = startParams
+}
+
+func (s *StateMachineInstanceImpl) EndParams() map[string]interface{} {
+ return s.endParams
+}
+
+func (s *StateMachineInstanceImpl) SetEndParams(endParams
map[string]interface{}) {
+ s.endParams = endParams
+}
+
+func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) {
+ s.contextMutex.Lock()
+ defer s.contextMutex.Unlock()
+
+ s.context[key] = value
+}
+
+func (s *StateMachineInstanceImpl) SetContext(context map[string]interface{}) {
+ s.context = context
+}
+
+func (s *StateMachineInstanceImpl) StateMachine() StateMachine {
+ return s.stateMachine
+}
+
+func (s *StateMachineInstanceImpl) SetStateMachine(stateMachine StateMachine) {
+ s.stateMachine = stateMachine
+ s.machineId = stateMachine.ID()
+}
+
+func (s *StateMachineInstanceImpl) SerializedStartParams() interface{} {
+ return s.serializedStartParams
+}
+
+func (s *StateMachineInstanceImpl)
SetSerializedStartParams(serializedStartParams interface{}) {
+ s.serializedStartParams = serializedStartParams
+}
+
+func (s *StateMachineInstanceImpl) SerializedEndParams() interface{} {
+ return s.endParams
+}
+
+func (s *StateMachineInstanceImpl) SetSerializedEndParams(serializedEndParams
interface{}) {
+ s.serializedEndParams = serializedEndParams
+}
+
+func (s *StateMachineInstanceImpl) SerializedError() interface{} {
+ return s.serializedError
+}
+
+func (s *StateMachineInstanceImpl) SetSerializedError(serializedError
interface{}) {
+ s.serializedError = serializedError
+}
diff --git a/pkg/util/reflectx/unmarkshaler.go
b/pkg/util/reflectx/unmarkshaler.go
new file mode 100644
index 00000000..73acb276
--- /dev/null
+++ b/pkg/util/reflectx/unmarkshaler.go
@@ -0,0 +1,147 @@
+package reflectx
+
+import (
+ "fmt"
+ "github.com/pkg/errors"
+ "reflect"
+ "unicode"
+)
+
+// MapToStruct some state can use this util to parse
+// TODO 性能测试,性能差的话,直接去解析,不使用反射
+func MapToStruct(stateName string, obj interface{}, stateMap
map[string]interface{}) error {
+ objVal := reflect.ValueOf(obj)
+ if objVal.Kind() != reflect.Pointer {
+ return errors.New(fmt.Sprintf("State [%s] value required a
pointer", stateName))
+ }
+
+ structValue := objVal.Elem()
+ if structValue.Kind() != reflect.Struct {
+ return errors.New(fmt.Sprintf("State [%s] value elem required a
struct", stateName))
+ }
+
+ structType := structValue.Type()
+ for key, value := range stateMap {
+ //Get field, get alias first
+ field, found := getField(structType, key)
+ if !found {
+ continue
+ }
+
+ fieldVal := structValue.FieldByName(field.Name)
+ if !fieldVal.IsValid() {
+ return errors.New(fmt.Sprintf("State [%s] not support
[%s] filed", stateName, key))
+ }
+
+ //Get setMethod
+ var setMethod reflect.Value
+ if !fieldVal.CanSet() {
+ setMethod = getFiledSetMethod(field.Name, objVal)
+
+ if !setMethod.IsValid() {
+ fieldAliasName := field.Tag.Get("alias")
+ setMethod = getFiledSetMethod(fieldAliasName,
objVal)
+ }
+
+ if !setMethod.IsValid() {
+ return errors.New(fmt.Sprintf("State [%s] [%s]
field not support setMethod", stateName, key))
+ }
+ setMethodType := setMethod.Type()
+ if !(setMethodType.NumIn() == 1 && setMethodType.In(0)
== fieldVal.Type()) {
+ return errors.New(fmt.Sprintf("State [%s] [%s]
field setMethod illegal", stateName, key))
+ }
+ }
+
+ val := reflect.ValueOf(value)
+ if fieldVal.Kind() == reflect.Struct {
+ //map[string]interface{}
+ if val.Kind() != reflect.Map {
+ return errors.New(fmt.Sprintf("State [%s] [%s]
field type required map", stateName, key))
+ }
+
+ err := MapToStruct(stateName,
fieldVal.Addr().Interface(), value.(map[string]interface{}))
+ if err != nil {
+ return err
+ }
+ } else if fieldVal.Kind() == reflect.Slice {
+ if val.Kind() != reflect.Slice {
+ return errors.New(fmt.Sprintf("State [%s] [%s]
field type required slice", stateName, key))
+ }
+
+ sliceType := fieldVal.Type().Elem()
+ newSlice := reflect.MakeSlice(fieldVal.Type(), 0,
val.Len())
+
+ for i := 0; i < val.Len(); i++ {
+ newElem := reflect.New(sliceType.Elem())
+ elemMap :=
val.Index(i).Interface().(map[string]interface{})
+ err := MapToStruct(stateName,
newElem.Interface(), elemMap)
+ if err != nil {
+ return err
+ }
+ reflect.Append(newSlice, newElem.Elem())
+ }
+ setFiled(fieldVal, setMethod, newSlice)
+ } else if fieldVal.Kind() == reflect.Map {
+ if val.Kind() != reflect.Map {
+ return errors.New(fmt.Sprintf("State [%s] [%s]
field type required map", stateName, key))
+ }
+
+ mapType := field.Type
+ newMap := reflect.MakeMap(mapType)
+
+ for _, key := range val.MapKeys() {
+ newVal := reflect.New(mapType.Elem().Elem())
+ elemMap :=
val.MapIndex(key).Interface().(map[string]interface{})
+ err := MapToStruct(stateName,
newVal.Interface(), elemMap)
+ if err != nil {
+ return err
+ }
+ newMap.SetMapIndex(key, newVal.Elem())
+ }
+ setFiled(fieldVal, setMethod, newMap)
+ } else {
+ setFiled(fieldVal, setMethod, val)
+ }
+ }
+ return nil
+}
+
+func getField(t reflect.Type, name string) (reflect.StructField, bool) {
+ for i := 0; i < t.NumField(); i++ {
+ field := t.Field(i)
+ tag, hasAliasTag := field.Tag.Lookup("alias")
+
+ if (hasAliasTag && tag == name) || (!hasAliasTag && field.Name
== name) {
+ return field, true
+ }
+
+ if field.Anonymous {
+ embeddedField, ok := getField(field.Type, name)
+ if ok {
+ return embeddedField, true
+ }
+ }
+ }
+
+ return reflect.StructField{}, false
+}
+
+func getFiledSetMethod(name string, structValue reflect.Value) reflect.Value {
+ fieldNameSlice := []rune(name)
+ fieldNameSlice[0] = unicode.ToUpper(fieldNameSlice[0])
+
+ setMethodName := "Set" + string(fieldNameSlice)
+
+ setMethod := structValue.MethodByName(setMethodName)
+ return setMethod
+}
+
+func setFiled(fieldVal reflect.Value, setMethod reflect.Value, val
reflect.Value) {
+ if !fieldVal.CanSet() {
+ setMethod.Call([]reflect.Value{
+ val,
+ })
+ } else {
+ fieldVal.Set(val)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]