This is an automated email from the ASF dual-hosted git repository.

yixia pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/saga by this push:
     new 58fc3e13 feature: init saga framework (#635)
58fc3e13 is described below

commit 58fc3e13dffa176082f39a3a989b20b2752c47e0
Author: wt_better <[email protected]>
AuthorDate: Sat Dec 23 23:01:17 2023 +0800

    feature: init saga framework (#635)
    
    init saga framework
---
 pkg/saga/readme.md                                 |  31 ++
 pkg/saga/statemachine/engine/contant.go            |  20 ++
 pkg/saga/statemachine/engine/events/event.go       |   4 +
 pkg/saga/statemachine/engine/events/event_bus.go   |  49 +++
 .../statemachine/engine/events/event_consumer.go   |  29 ++
 .../statemachine/engine/events/event_publisher.go  |  19 ++
 pkg/saga/statemachine/engine/expr/expression.go    |  10 +
 pkg/saga/statemachine/engine/invoker/invoker.go    |  14 +
 .../engine/process_ctrl/bussiness_processor.go     |  92 ++++++
 .../engine/process_ctrl/instruction/instruction.go |  25 ++
 .../engine/process_ctrl/process_context.go         | 284 ++++++++++++++++++
 .../engine/process_ctrl/process_type.go            |   7 +
 .../engine/process_ctrl/statemachine_processor.go  | 139 +++++++++
 .../engine/process_ctrl_statemachine_engine.go     | 124 ++++++++
 pkg/saga/statemachine/engine/sequence/sequence.go  |   5 +
 pkg/saga/statemachine/engine/sequence/uuid.go      |  14 +
 .../statemachine/engine/statemachine_config.go     |  44 +++
 .../statemachine/engine/statemachine_engine.go     |  16 +
 .../engine/status_decision/status_decision.go      |   4 +
 .../engine/store/statemachine_store.go             |  60 ++++
 .../statelang/parser/choice_state_json_parser.go   |  72 +++++
 .../statelang/parser/statemachine_json_parser.go   | 100 +++++++
 .../parser/statemachine_json_parser_test.go        |  13 +
 .../statelang/parser/statemachine_parser.go        | 104 +++++++
 pkg/saga/statemachine/statelang/state.go           |  71 +++++
 .../statemachine/statelang/state/choice_state.go   |  74 +++++
 .../statemachine/statelang/state/task_state.go     |  30 ++
 pkg/saga/statemachine/statelang/state_instance.go  | 330 +++++++++++++++++++++
 pkg/saga/statemachine/statelang/statemachine.go    | 261 ++++++++++++++++
 .../statelang/statemachine_instance.go             | 316 ++++++++++++++++++++
 pkg/util/reflectx/unmarkshaler.go                  | 147 +++++++++
 31 files changed, 2508 insertions(+)

diff --git a/pkg/saga/readme.md b/pkg/saga/readme.md
new file mode 100644
index 00000000..9aa593da
--- /dev/null
+++ b/pkg/saga/readme.md
@@ -0,0 +1,31 @@
+
+# seata saga
+
+未来计划有三种使用方式
+
+- 基于状态机引擎的 json
+   link: statemachine_engine#Start
+- stream builder
+    stateMachine.serviceTask().build().Start
+- 二阶段方式saga,类似tcc使用
+
+上面1、2是以来[statemachine](statemachine),状态机引擎实现的,3相对比较独立。
+
+
+状态机的实现在:saga-statemachine包中
+其中[statelang](statemachine%2Fstatelang)是状态机语言的解析,目前实现的是json解析方式,状态机语言可以参考:
+https://seata.io/docs/user/mode/saga
+
+状态机json执行的入口类是:[statemachine_engine.go](statemachine%2Fengine%2Fstatemachine_engine.go)
+
+下面简单说下engine中各个包的作用:
+events:saga的是基于事件处理的,其中是event、eventBus的实现
+expr:表达式声明、解析、执行
+invoker:声明了serviceInvoker、scriptInvoker等接口、task调用管理、执行都在这个包中,例如httpInvoker
+process_ctrl:状态机处理流程:上下文、执行、事件流转
+sequence:分布式id
+store:状态机存储接口、实现
+status_decision:状态机状态决策
+
+
+
diff --git a/pkg/saga/statemachine/engine/contant.go 
b/pkg/saga/statemachine/engine/contant.go
new file mode 100644
index 00000000..9527fc29
--- /dev/null
+++ b/pkg/saga/statemachine/engine/contant.go
@@ -0,0 +1,20 @@
+package engine
+
+const (
+       VarNameProcessType         string = "_ProcessType_"
+       VarNameOperationName       string = "_operation_name_"
+       OperationNameStart         string = "start"
+       VarNameAsyncCallback       string = "_async_callback_"
+       VarNameStateMachineInst    string = "_current_statemachine_instance_"
+       VarNameStateMachine        string = "_current_statemachine_"
+       VarNameStateMachineEngine  string = "_current_statemachine_engine_"
+       VarNameStateMachineConfig  string = "_statemachine_config_"
+       VarNameStateMachineContext string = "context"
+       VarNameIsAsyncExecution    string = "_is_async_execution_"
+       VarNameStateInst           string = "_current_state_instance_"
+       SeqEntityStateMachineInst  string = "STATE_MACHINE_INST"
+       VarNameBusinesskey         string = "_business_key_"
+       VarNameParentId            string = "_parent_id_"
+       StateTypeServiceTask       string = "ServiceTask"
+       StateTypeChoice            string = "Choice"
+)
diff --git a/pkg/saga/statemachine/engine/events/event.go 
b/pkg/saga/statemachine/engine/events/event.go
new file mode 100644
index 00000000..8f142713
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event.go
@@ -0,0 +1,4 @@
+package events
+
+type Event interface {
+}
diff --git a/pkg/saga/statemachine/engine/events/event_bus.go 
b/pkg/saga/statemachine/engine/events/event_bus.go
new file mode 100644
index 00000000..77a1e616
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event_bus.go
@@ -0,0 +1,49 @@
+package events
+
+import (
+       "context"
+)
+
+type EventBus interface {
+       Offer(ctx context.Context, event Event) (bool, error)
+
+       RegisterEventConsumer(consumer EventConsumer)
+}
+
+type BaseEventBus struct {
+       eventConsumerList []EventConsumer
+}
+
+func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
+       if b.eventConsumerList == nil {
+               b.eventConsumerList = make([]EventConsumer, 0)
+       }
+       b.eventConsumerList = append(b.eventConsumerList, consumer)
+}
+
+func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer {
+       var acceptedConsumerList = make([]EventConsumer, 0)
+       for i := range b.eventConsumerList {
+               eventConsumer := b.eventConsumerList[i]
+               if eventConsumer.Accept(event) {
+                       acceptedConsumerList = append(acceptedConsumerList, 
eventConsumer)
+               }
+       }
+       return acceptedConsumerList
+}
+
+type DirectEventBus struct {
+       BaseEventBus
+}
+
+func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
+       eventConsumerList := d.GetEventConsumerList(event)
+       if len(eventConsumerList) == 0 {
+               //TODO logger
+               return false, nil
+       }
+
+       //processContext, _ := event.(process_ctrl.ProcessContext)
+
+       return true, nil
+}
diff --git a/pkg/saga/statemachine/engine/events/event_consumer.go 
b/pkg/saga/statemachine/engine/events/event_consumer.go
new file mode 100644
index 00000000..426da15e
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event_consumer.go
@@ -0,0 +1,29 @@
+package events
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+)
+
+type EventConsumer interface {
+       Accept(event Event) bool
+
+       Process(ctx context.Context, event Event) error
+}
+
+type ProcessCtrlEventConsumer struct {
+}
+
+func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
+       if event == nil {
+               return false
+       }
+
+       _, ok := event.(process_ctrl.ProcessContext)
+       return ok
+}
+
+func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event) 
error {
+       //TODO implement me
+       panic("implement me")
+}
diff --git a/pkg/saga/statemachine/engine/events/event_publisher.go 
b/pkg/saga/statemachine/engine/events/event_publisher.go
new file mode 100644
index 00000000..1fbfaec2
--- /dev/null
+++ b/pkg/saga/statemachine/engine/events/event_publisher.go
@@ -0,0 +1,19 @@
+package events
+
+import "context"
+
+type EventPublisher interface {
+       PushEvent(ctx context.Context, event Event) (bool, error)
+}
+
+type ProcessCtrlEventPublisher struct {
+       eventBus EventBus
+}
+
+func NewProcessCtrlEventPublisher(eventBus EventBus) 
*ProcessCtrlEventPublisher {
+       return &ProcessCtrlEventPublisher{eventBus: eventBus}
+}
+
+func (p ProcessCtrlEventPublisher) PushEvent(ctx context.Context, event Event) 
(bool, error) {
+       return p.eventBus.Offer(ctx, event)
+}
diff --git a/pkg/saga/statemachine/engine/expr/expression.go 
b/pkg/saga/statemachine/engine/expr/expression.go
new file mode 100644
index 00000000..9706cda5
--- /dev/null
+++ b/pkg/saga/statemachine/engine/expr/expression.go
@@ -0,0 +1,10 @@
+package expr
+
+type ExpressionResolver interface {
+}
+
+type Expression interface {
+}
+
+type ExpressionFactoryManager struct {
+}
diff --git a/pkg/saga/statemachine/engine/invoker/invoker.go 
b/pkg/saga/statemachine/engine/invoker/invoker.go
new file mode 100644
index 00000000..8c2ec069
--- /dev/null
+++ b/pkg/saga/statemachine/engine/invoker/invoker.go
@@ -0,0 +1,14 @@
+package invoker
+
+type ScriptInvokerManager interface {
+}
+
+type ScriptInvoker interface {
+}
+
+type ServiceInvokerManager interface {
+}
+
+type ServiceInvoker interface {
+       Invoke()
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go 
b/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
new file mode 100644
index 00000000..198092a1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
@@ -0,0 +1,92 @@
+package process_ctrl
+
+import (
+       "context"
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine"
+       "sync"
+)
+
+type BusinessProcessor interface {
+       Process(ctx context.Context, processContext ProcessContext) error
+
+       Route(ctx context.Context, processContext ProcessContext) error
+}
+
+type DefaultBusinessProcessor struct {
+       processHandlers map[string]ProcessHandler
+       routerHandlers  map[string]RouterHandler
+       mu              sync.RWMutex
+}
+
+func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType 
ProcessType, processHandler ProcessHandler) {
+       d.mu.Lock()
+       defer d.mu.Unlock()
+
+       d.processHandlers[string(processType)] = processHandler
+}
+
+func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType 
ProcessType, routerHandler RouterHandler) {
+       d.mu.Lock()
+       defer d.mu.Unlock()
+
+       d.routerHandlers[string(processType)] = routerHandler
+}
+
+func (d *DefaultBusinessProcessor) Process(ctx context.Context, processContext 
ProcessContext) error {
+       processType := d.matchProcessType(processContext)
+
+       processHandler, err := d.getProcessHandler(processType)
+       if err != nil {
+               return err
+       }
+
+       return processHandler.Process(ctx, processContext)
+}
+
+func (d *DefaultBusinessProcessor) Route(ctx context.Context, processContext 
ProcessContext) error {
+       processType := d.matchProcessType(processContext)
+
+       routerHandler, err := d.getRouterHandler(processType)
+       if err != nil {
+               return err
+       }
+
+       return routerHandler.Route(ctx, processContext)
+}
+
+func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType) 
(ProcessHandler, error) {
+       d.mu.RLock()
+       defer d.mu.RUnlock()
+       processHandler, ok := d.processHandlers[string(processType)]
+       if !ok {
+               return nil, errors.New("Cannot find process handler by type " + 
string(processType))
+       }
+       return processHandler, nil
+}
+
+func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) 
(RouterHandler, error) {
+       d.mu.RLock()
+       defer d.mu.RUnlock()
+       routerHandler, ok := d.routerHandlers[string(processType)]
+       if !ok {
+               return nil, errors.New("Cannot find router handler by type " + 
string(processType))
+       }
+       return routerHandler, nil
+}
+
+func (d *DefaultBusinessProcessor) matchProcessType(processContext 
ProcessContext) ProcessType {
+       ok := processContext.HasVariable(engine.VarNameProcessType)
+       if ok {
+               return 
processContext.GetVariable(engine.VarNameProcessType).(ProcessType)
+       }
+       return StateLang
+}
+
+type ProcessHandler interface {
+       Process(ctx context.Context, processContext ProcessContext) error
+}
+
+type RouterHandler interface {
+       Route(ctx context.Context, processContext ProcessContext) error
+}
diff --git 
a/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go 
b/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go
new file mode 100644
index 00000000..1537dc67
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go
@@ -0,0 +1,25 @@
+package instruction
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type Instruction interface {
+}
+
+type StateInstruction struct {
+       StateName        string
+       StateMachineName string
+       TenantId         string
+       End              bool
+}
+
+func (s StateInstruction) GetState(context process_ctrl.ProcessContext) 
(statelang.State, error) {
+       //TODO implement me
+       panic("implement me")
+}
+
+func NewStateInstruction(stateMachineName string, tenantId string) 
*StateInstruction {
+       return &StateInstruction{StateMachineName: stateMachineName, TenantId: 
tenantId}
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_context.go 
b/pkg/saga/statemachine/engine/process_ctrl/process_context.go
new file mode 100644
index 00000000..6531b1af
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/process_context.go
@@ -0,0 +1,284 @@
+package process_ctrl
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine"
+       
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "sync"
+)
+
+type ProcessContext interface {
+       GetVariable(name string) interface{}
+
+       SetVariable(name string, value interface{})
+
+       GetVariables() map[string]interface{}
+
+       SetVariables(variables map[string]interface{})
+
+       RemoveVariable(name string) interface{}
+
+       HasVariable(name string) bool
+
+       GetInstruction() instruction.Instruction
+
+       SetInstruction(instruction instruction.Instruction)
+}
+
+type HierarchicalProcessContext interface {
+       ProcessContext
+
+       GetVariableLocally(name string) interface{}
+
+       SetVariableLocally(name string, value interface{})
+
+       GetVariablesLocally() map[string]interface{}
+
+       SetVariablesLocally(variables map[string]interface{})
+
+       RemoveVariableLocally(name string) interface{}
+
+       HasVariableLocally(name string) bool
+
+       ClearLocally()
+}
+
+type ProcessContextImpl struct {
+       parent      ProcessContext
+       mu          sync.RWMutex
+       mp          map[string]interface{}
+       instruction instruction.Instruction
+}
+
+func (p *ProcessContextImpl) GetVariable(name string) interface{} {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       value, ok := p.mp[name]
+       if ok {
+               return value
+       }
+
+       if p.parent != nil {
+               return p.parent.GetVariable(name)
+       }
+
+       return nil
+}
+
+func (p *ProcessContextImpl) SetVariable(name string, value interface{}) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       _, ok := p.mp[name]
+       if ok {
+               p.mp[name] = value
+       } else {
+               if p.parent != nil {
+                       p.parent.SetVariable(name, value)
+               } else {
+                       p.mp[name] = value
+               }
+       }
+}
+
+func (p *ProcessContextImpl) GetVariables() map[string]interface{} {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       newVariablesMap := make(map[string]interface{})
+       if p.parent != nil {
+               variables := p.parent.GetVariables()
+               for k, v := range variables {
+                       newVariablesMap[k] = v
+               }
+       }
+
+       for k, v := range p.mp {
+               newVariablesMap[k] = v
+       }
+
+       return newVariablesMap
+}
+
+func (p *ProcessContextImpl) SetVariables(variables map[string]interface{}) {
+       for k, v := range variables {
+               p.SetVariable(k, v)
+       }
+}
+
+func (p *ProcessContextImpl) RemoveVariable(name string) interface{} {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       value, ok := p.mp[name]
+       if ok {
+               delete(p.mp, name)
+               return value
+       }
+
+       if p.parent != nil {
+               return p.parent.RemoveVariable(name)
+       }
+
+       return nil
+}
+
+func (p *ProcessContextImpl) HasVariable(name string) bool {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       _, ok := p.mp[name]
+       if ok {
+               return true
+       }
+
+       if p.parent != nil {
+               return p.parent.HasVariable(name)
+       }
+
+       return false
+}
+
+func (p *ProcessContextImpl) GetInstruction() instruction.Instruction {
+       return p.instruction
+}
+
+func (p *ProcessContextImpl) SetInstruction(instruction 
instruction.Instruction) {
+       p.instruction = instruction
+}
+
+func (p *ProcessContextImpl) GetVariableLocally(name string) interface{} {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       value, _ := p.mp[name]
+       return value
+}
+
+func (p *ProcessContextImpl) SetVariableLocally(name string, value 
interface{}) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       p.mp[name] = value
+}
+
+func (p *ProcessContextImpl) GetVariablesLocally() map[string]interface{} {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       newVariablesMap := make(map[string]interface{}, len(p.mp))
+       for k, v := range p.mp {
+               newVariablesMap[k] = v
+       }
+       return newVariablesMap
+}
+
+func (p *ProcessContextImpl) SetVariablesLocally(variables 
map[string]interface{}) {
+       for k, v := range variables {
+               p.SetVariableLocally(k, v)
+       }
+}
+
+func (p *ProcessContextImpl) RemoveVariableLocally(name string) interface{} {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       value, _ := p.mp[name]
+       delete(p.mp, name)
+       return value
+}
+
+func (p *ProcessContextImpl) HasVariableLocally(name string) bool {
+       p.mu.RLock()
+       defer p.mu.RUnlock()
+
+       _, ok := p.mp[name]
+       return ok
+}
+
+func (p *ProcessContextImpl) ClearLocally() {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       p.mp = map[string]interface{}{}
+}
+
+// ProcessContextBuilder process_ctrl builder
+type ProcessContextBuilder struct {
+       processContext ProcessContext
+}
+
+func NewProcessContextBuilder() *ProcessContextBuilder {
+       processContextImpl := &ProcessContextImpl{}
+       return &ProcessContextBuilder{processContextImpl}
+}
+
+func (p *ProcessContextBuilder) WithProcessType(processType ProcessType) 
*ProcessContextBuilder {
+       p.processContext.SetVariable(engine.VarNameProcessType, processType)
+       return p
+}
+
+func (p *ProcessContextBuilder) WithOperationName(operationName string) 
*ProcessContextBuilder {
+       p.processContext.SetVariable(engine.VarNameOperationName, operationName)
+       return p
+}
+
+func (p *ProcessContextBuilder) WithAsyncCallback(callBack engine.CallBack) 
*ProcessContextBuilder {
+       if callBack != nil {
+               p.processContext.SetVariable(engine.VarNameAsyncCallback, 
callBack)
+       }
+
+       return p
+}
+
+func (p *ProcessContextBuilder) WithInstruction(instruction 
instruction.Instruction) *ProcessContextBuilder {
+       if instruction != nil {
+               p.processContext.SetInstruction(instruction)
+       }
+
+       return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineInstance(stateMachineInstance 
statelang.StateMachineInstance) *ProcessContextBuilder {
+       if stateMachineInstance != nil {
+               p.processContext.SetVariable(engine.VarNameStateMachineInst, 
stateMachineInstance)
+               p.processContext.SetVariable(engine.VarNameStateMachine, 
stateMachineInstance.StateMachine())
+       }
+
+       return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineEngine(stateMachineEngine 
engine.StateMachineEngine) *ProcessContextBuilder {
+       if stateMachineEngine != nil {
+               p.processContext.SetVariable(engine.VarNameStateMachineEngine, 
stateMachineEngine)
+       }
+
+       return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineConfig(stateMachineConfig 
engine.StateMachineConfig) *ProcessContextBuilder {
+       if stateMachineConfig != nil {
+               p.processContext.SetVariable(engine.VarNameStateMachineConfig, 
stateMachineConfig)
+       }
+
+       return p
+}
+
+func (p *ProcessContextBuilder) WithStateMachineContextVariables(contextMap 
map[string]interface{}) *ProcessContextBuilder {
+       if contextMap != nil {
+               p.processContext.SetVariable(engine.VarNameStateMachineContext, 
contextMap)
+       }
+
+       return p
+}
+
+func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) 
*ProcessContextBuilder {
+       p.processContext.SetVariable(engine.VarNameIsAsyncExecution, async)
+
+       return p
+}
+
+func (p *ProcessContextBuilder) Build() ProcessContext {
+       return p.processContext
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_type.go 
b/pkg/saga/statemachine/engine/process_ctrl/process_type.go
new file mode 100644
index 00000000..14027a28
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/process_type.go
@@ -0,0 +1,7 @@
+package process_ctrl
+
+type ProcessType string
+
+const (
+       StateLang ProcessType = "STATE_LANG" // SEATA State Language
+)
diff --git 
a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go 
b/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
new file mode 100644
index 00000000..ce3479c3
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
@@ -0,0 +1,139 @@
+package process_ctrl
+
+import (
+       "context"
+       "github.com/pkg/errors"
+       
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
+       "sync"
+)
+
+type StateHandler interface {
+       State() string
+       ProcessHandler
+}
+
+type StateRouter interface {
+       State() string
+       RouterHandler
+}
+
+type InterceptAbleStateHandler interface {
+       StateHandler
+       StateHandlerInterceptorList() []StateHandlerInterceptor
+       RegistryStateHandlerInterceptor(stateHandlerInterceptor 
StateHandlerInterceptor)
+}
+
+type StateHandlerInterceptor interface {
+       PreProcess(ctx context.Context, processContext ProcessContext) error
+       PostProcess(ctx context.Context, processContext ProcessContext) error
+}
+
+type StateMachineProcessHandler struct {
+       mp map[string]StateHandler
+       mu sync.RWMutex
+}
+
+func NewStateMachineProcessHandler() *StateMachineProcessHandler {
+       return &StateMachineProcessHandler{
+               mp: make(map[string]StateHandler),
+       }
+}
+
+func (s *StateMachineProcessHandler) Process(ctx context.Context, 
processContext ProcessContext) error {
+       stateInstruction, _ := 
processContext.GetInstruction().(instruction.StateInstruction)
+
+       state, err := stateInstruction.GetState(processContext)
+       if err != nil {
+               return err
+       }
+
+       stateType := state.Type()
+       stateHandler := s.GetStateHandler(stateType)
+       if stateHandler == nil {
+               return errors.New("Not support [" + stateType + "] state 
handler")
+       }
+
+       interceptAbleStateHandler, ok := 
stateHandler.(InterceptAbleStateHandler)
+
+       var stateHandlerInterceptorList []StateHandlerInterceptor
+       if ok {
+               stateHandlerInterceptorList = 
interceptAbleStateHandler.StateHandlerInterceptorList()
+       }
+
+       if stateHandlerInterceptorList != nil && 
len(stateHandlerInterceptorList) > 0 {
+               for _, stateHandlerInterceptor := range 
stateHandlerInterceptorList {
+                       err = stateHandlerInterceptor.PreProcess(ctx, 
processContext)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       err = stateHandler.Process(ctx, processContext)
+       if err != nil {
+               return err
+       }
+
+       if stateHandlerInterceptorList != nil && 
len(stateHandlerInterceptorList) > 0 {
+               for _, stateHandlerInterceptor := range 
stateHandlerInterceptorList {
+                       err = stateHandlerInterceptor.PostProcess(ctx, 
processContext)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       return nil
+}
+
+func (s *StateMachineProcessHandler) GetStateHandler(stateType string) 
StateHandler {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+       return s.mp[stateType]
+}
+
+func (s *StateMachineProcessHandler) RegistryStateHandler(stateType string, 
stateHandler StateHandler) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       if s.mp == nil {
+               s.mp = make(map[string]StateHandler)
+       }
+       s.mp[stateType] = stateHandler
+}
+
+type StateMachineRouterHandler struct {
+       mu sync.RWMutex
+       mp map[string]StateRouter
+}
+
+func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext 
ProcessContext) error {
+       stateInstruction, _ := 
processContext.GetInstruction().(instruction.StateInstruction)
+
+       state, err := stateInstruction.GetState(processContext)
+       if err != nil {
+               return err
+       }
+
+       stateType := state.Type()
+       stateRouter := s.GetStateRouter(stateType)
+       if stateRouter == nil {
+               return errors.New("Not support [" + stateType + "] state 
router")
+       }
+
+       return stateRouter.Route(ctx, processContext)
+}
+
+func (s *StateMachineRouterHandler) GetStateRouter(stateType string) 
StateRouter {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+       return s.mp[stateType]
+}
+
+func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string, 
stateRouter StateRouter) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+       if s.mp == nil {
+               s.mp = make(map[string]StateRouter)
+       }
+       s.mp[stateType] = stateRouter
+}
diff --git a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go 
b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
new file mode 100644
index 00000000..4c115df1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
@@ -0,0 +1,124 @@
+package engine
+
+import (
+       "context"
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "time"
+)
+
+type ProcessCtrlStateMachineEngine struct {
+       StateMachineConfig StateMachineConfig
+}
+
+func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, 
stateMachineName string, tenantId string, startParams map[string]interface{}) 
(statelang.StateMachineInstance, error) {
+       return p.startInternal(ctx, stateMachineName, tenantId, "", 
startParams, false, nil)
+}
+
+func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, 
stateMachineName string, tenantId string, businessKey string, startParams 
map[string]interface{}, async bool, callback CallBack) 
(statelang.StateMachineInstance, error) {
+       if tenantId == "" {
+               tenantId = p.StateMachineConfig.DefaultTenantId()
+       }
+
+       stateMachineInstance, err := p.createMachineInstance(stateMachineName, 
tenantId, businessKey, startParams)
+       if err != nil {
+               return nil, err
+       }
+
+       // Build the process_ctrl context.
+       processContextBuilder := process_ctrl.NewProcessContextBuilder().
+               WithProcessType(process_ctrl.StateLang).
+               WithOperationName(OperationNameStart).
+               WithAsyncCallback(callback).
+               
WithInstruction(instruction.NewStateInstruction(stateMachineName, tenantId)).
+               WithStateMachineInstance(stateMachineInstance).
+               WithStateMachineConfig(p.StateMachineConfig).
+               WithStateMachineEngine(p).
+               WithIsAsyncExecution(async)
+
+       contextMap := p.copyMap(startParams)
+
+       stateMachineInstance.SetContext(contextMap)
+
+       processContext := 
processContextBuilder.WithStateMachineContextVariables(contextMap).Build()
+
+       if stateMachineInstance.StateMachine().IsPersist() && 
p.StateMachineConfig.StateLogStore() != nil {
+               err := 
p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, 
stateMachineInstance, processContext)
+               if err != nil {
+                       return nil, err
+               }
+       }
+
+       if stateMachineInstance.ID() == "" {
+               
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(SeqEntityStateMachineInst,
 ""))
+       }
+
+       var eventPublisher events.EventPublisher
+       if async {
+               eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
+       } else {
+               eventPublisher = p.StateMachineConfig.EventPublisher()
+       }
+
+       _, err = eventPublisher.PushEvent(ctx, processContext)
+       if err != nil {
+               return nil, err
+       }
+
+       return stateMachineInstance, nil
+}
+
+// copyMap not deep copy, so best practice: Don’t pass by reference
+func (p ProcessCtrlStateMachineEngine) copyMap(startParams 
map[string]interface{}) map[string]interface{} {
+       copyMap := make(map[string]interface{}, len(startParams))
+       for k, v := range startParams {
+               copyMap[k] = v
+       }
+       return copyMap
+}
+
+func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName 
string, tenantId string, businessKey string, startParams 
map[string]interface{}) (statelang.StateMachineInstance, error) {
+       stateMachine, err := 
p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName,
 tenantId)
+       if err != nil {
+               return nil, err
+       }
+
+       if stateMachine == nil {
+               return nil, errors.New("StateMachine [" + stateMachineName + "] 
is not exists")
+       }
+
+       stateMachineInstance := statelang.NewStateMachineInstanceImpl()
+       stateMachineInstance.SetStateMachine(stateMachine)
+       stateMachineInstance.SetTenantID(tenantId)
+       stateMachineInstance.SetBusinessKey(businessKey)
+       stateMachineInstance.SetStartParams(startParams)
+       if startParams != nil {
+               if businessKey != "" {
+                       startParams[VarNameBusinesskey] = businessKey
+               }
+
+               if startParams[VarNameParentId] != nil {
+                       parentId, ok := startParams[VarNameParentId].(string)
+                       if !ok {
+
+                       }
+                       stateMachineInstance.SetParentID(parentId)
+                       delete(startParams, VarNameParentId)
+               }
+       }
+
+       stateMachineInstance.SetStatus(statelang.RU)
+       stateMachineInstance.SetRunning(true)
+
+       now := time.Now()
+       stateMachineInstance.SetStartedTime(now)
+       stateMachineInstance.SetUpdatedTime(now)
+       return stateMachineInstance, nil
+}
+
+func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig) 
*ProcessCtrlStateMachineEngine {
+       return &ProcessCtrlStateMachineEngine{StateMachineConfig: 
stateMachineConfig}
+}
diff --git a/pkg/saga/statemachine/engine/sequence/sequence.go 
b/pkg/saga/statemachine/engine/sequence/sequence.go
new file mode 100644
index 00000000..1a7fdae7
--- /dev/null
+++ b/pkg/saga/statemachine/engine/sequence/sequence.go
@@ -0,0 +1,5 @@
+package sequence
+
+type SeqGenerator interface {
+       GenerateId(entity string, ruleName string) string
+}
diff --git a/pkg/saga/statemachine/engine/sequence/uuid.go 
b/pkg/saga/statemachine/engine/sequence/uuid.go
new file mode 100644
index 00000000..f3060dcf
--- /dev/null
+++ b/pkg/saga/statemachine/engine/sequence/uuid.go
@@ -0,0 +1,14 @@
+package sequence
+
+import "github.com/google/uuid"
+
+type UUIDSeqGenerator struct {
+}
+
+func NewUUIDSeqGenerator() *UUIDSeqGenerator {
+       return &UUIDSeqGenerator{}
+}
+
+func (U UUIDSeqGenerator) GenerateId(entity string, ruleName string) string {
+       return uuid.New().String()
+}
diff --git a/pkg/saga/statemachine/engine/statemachine_config.go 
b/pkg/saga/statemachine/engine/statemachine_config.go
new file mode 100644
index 00000000..3f3c530b
--- /dev/null
+++ b/pkg/saga/statemachine/engine/statemachine_config.go
@@ -0,0 +1,44 @@
+package engine
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
+)
+
+type StateMachineConfig interface {
+       StateLogRepository() store.StateLogRepository
+
+       StateMachineRepository() store.StateMachineRepository
+
+       StateLogStore() store.StateLogStore
+
+       StateLangStore() store.StateLangStore
+
+       ExpressionFactoryManager() expr.ExpressionFactoryManager
+
+       ExpressionResolver() expr.ExpressionResolver
+
+       SeqGenerator() sequence.SeqGenerator
+
+       StatusDecisionStrategy() status_decision.StatusDecisionStrategy
+
+       EventPublisher() events.EventPublisher
+
+       AsyncEventPublisher() events.EventPublisher
+
+       ServiceInvokerManager() invoker.ServiceInvokerManager
+
+       ScriptInvokerManager() invoker.ScriptInvokerManager
+
+       CharSet() string
+
+       DefaultTenantId() string
+
+       TransOperationTimeout() int
+
+       ServiceInvokeTimeout() int
+}
diff --git a/pkg/saga/statemachine/engine/statemachine_engine.go 
b/pkg/saga/statemachine/engine/statemachine_engine.go
new file mode 100644
index 00000000..6b3954c1
--- /dev/null
+++ b/pkg/saga/statemachine/engine/statemachine_engine.go
@@ -0,0 +1,16 @@
+package engine
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type StateMachineEngine interface {
+       Start(ctx context.Context, stateMachineName string, tenantId string, 
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
+}
+
+type CallBack interface {
+       OnFinished(ctx context.Context, context process_ctrl.ProcessContext, 
stateMachineInstance statelang.StateMachineInstance)
+       OnError(ctx context.Context, context process_ctrl.ProcessContext, 
stateMachineInstance statelang.StateMachineInstance, err error)
+}
diff --git a/pkg/saga/statemachine/engine/status_decision/status_decision.go 
b/pkg/saga/statemachine/engine/status_decision/status_decision.go
new file mode 100644
index 00000000..6def093e
--- /dev/null
+++ b/pkg/saga/statemachine/engine/status_decision/status_decision.go
@@ -0,0 +1,4 @@
+package status_decision
+
+type StatusDecisionStrategy interface {
+}
diff --git a/pkg/saga/statemachine/engine/store/statemachine_store.go 
b/pkg/saga/statemachine/engine/store/statemachine_store.go
new file mode 100644
index 00000000..8959df92
--- /dev/null
+++ b/pkg/saga/statemachine/engine/store/statemachine_store.go
@@ -0,0 +1,60 @@
+package store
+
+import (
+       "context"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "io"
+)
+
+type StateLogRepository interface {
+       GetStateMachineInstance(stateMachineInstanceId string) 
(statelang.StateInstance, error)
+
+       GetStateMachineInstanceByBusinessKey(businessKey string, tenantId 
string) (statelang.StateInstance, error)
+
+       GetStateMachineInstanceByParentId(parentId string) 
([]statelang.StateMachineInstance, error)
+
+       GetStateInstance(stateInstanceId string, stateMachineInstanceId string) 
(statelang.StateInstance, error)
+
+       GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) 
([]statelang.StateInstance, error)
+}
+
+type StateLogStore interface {
+       RecordStateMachineStarted(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+
+       RecordStateMachineFinished(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+
+       RecordStateMachineRestarted(ctx context.Context, machineInstance 
statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
+
+       RecordStateStarted(ctx context.Context, stateInstance 
statelang.StateInstance, context process_ctrl.ProcessContext) error
+
+       RecordStateFinished(ctx context.Context, stateInstance 
statelang.StateInstance, context process_ctrl.ProcessContext) error
+
+       GetStateMachineInstance(stateMachineInstanceId string) 
(statelang.StateInstance, error)
+
+       GetStateMachineInstanceByBusinessKey(businessKey string, tenantId 
string) (statelang.StateInstance, error)
+
+       GetStateMachineInstanceByParentId(parentId string) 
([]statelang.StateMachineInstance, error)
+
+       GetStateInstance(stateInstanceId string, stateMachineInstanceId string) 
(statelang.StateInstance, error)
+
+       GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) 
([]statelang.StateInstance, error)
+}
+
+type StateMachineRepository interface {
+       GetStateMachineById(stateMachineId string) (statelang.StateMachine, 
error)
+
+       GetLastVersionStateMachine(stateMachineName string, tenantId string) 
(statelang.StateMachine, error)
+
+       RegistryStateMachine(statelang.StateMachine) error
+
+       RegistryStateMachineByReader(reader io.Reader) error
+}
+
+type StateLangStore interface {
+       GetStateMachineById(stateMachineId string) (statelang.StateMachine, 
error)
+
+       GetLastVersionStateMachine(stateMachineName string, tenantId string) 
(statelang.StateMachine, error)
+
+       StoreStateMachine(stateMachine statelang.StateMachine) error
+}
diff --git a/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go 
b/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go
new file mode 100644
index 00000000..04729c57
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go
@@ -0,0 +1,72 @@
+package parser
+
+import (
+       "fmt"
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+)
+
+type ChoiceStateParser struct {
+       BaseStateParser
+}
+
+func NewChoiceStateParser() *ChoiceStateParser {
+       return &ChoiceStateParser{}
+}
+
+func (c ChoiceStateParser) StateType() string {
+       return engine.StateTypeChoice
+}
+
+func (c ChoiceStateParser) Parse(stateName string, stateMap 
map[string]interface{}) (statelang.State, error) {
+       choiceState := state.NewChoiceStateImpl()
+       choiceState.SetName(stateName)
+
+       //parse Type
+       typeName, err := c.GetString(stateName, stateMap, "Type")
+       if err != nil {
+               return nil, err
+       }
+       choiceState.SetType(typeName)
+
+       //parse Default
+       defaultChoice, err := c.GetString(stateName, stateMap, "Default")
+       if err != nil {
+               return nil, err
+       }
+       choiceState.SetDefault(defaultChoice)
+
+       //parse Choices
+       slice, err := c.GetSlice(stateName, stateMap, "Choices")
+       if err != nil {
+               return nil, err
+       }
+
+       var choices []state.Choice
+       for i := range slice {
+               choiceValMap, ok := slice[i].(map[string]interface{})
+               if !ok {
+                       return nil, errors.New(fmt.Sprintf("State [%s] Choices 
element required struct", stateName))
+               }
+
+               choice := state.NewChoiceImpl()
+               expression, err := c.GetString(stateName, choiceValMap, 
"Expression")
+               if err != nil {
+                       return nil, err
+               }
+               choice.SetExpression(expression)
+
+               next, err := c.GetString(stateName, choiceValMap, "Next")
+               if err != nil {
+                       return nil, err
+               }
+               choice.SetNext(next)
+
+               choices = append(choices, choice)
+       }
+       choiceState.SetChoices(choices)
+
+       return choiceState, nil
+}
diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go 
b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go
new file mode 100644
index 00000000..c0977034
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go
@@ -0,0 +1,100 @@
+package parser
+
+import (
+       "encoding/json"
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type JSONStateMachineParser struct {
+}
+
+func NewJSONStateMachineParser() *JSONStateMachineParser {
+       return &JSONStateMachineParser{}
+}
+
+func (stateMachineParser JSONStateMachineParser) GetType() string {
+       return "JSON"
+}
+
+func (stateMachineParser JSONStateMachineParser) Parse(content string) 
(statelang.StateMachine, error) {
+       var stateMachineJsonObject StateMachineJsonObject
+
+       err := json.Unmarshal([]byte(content), &stateMachineJsonObject)
+       if err != nil {
+               return nil, err
+       }
+
+       stateMachine := statelang.NewStateMachineImpl()
+       stateMachine.SetName(stateMachineJsonObject.Name)
+       stateMachine.SetComment(stateMachineJsonObject.Comment)
+       stateMachine.SetVersion(stateMachineJsonObject.Version)
+       stateMachine.SetStartState(stateMachineJsonObject.StartState)
+       stateMachine.SetPersist(stateMachineJsonObject.Persist)
+
+       if stateMachineJsonObject.Type != "" {
+               stateMachine.SetType(stateMachineJsonObject.Type)
+       }
+
+       if stateMachineJsonObject.RecoverStrategy != "" {
+               recoverStrategy, ok := 
statelang.ValueOfRecoverStrategy(stateMachineJsonObject.RecoverStrategy)
+               if !ok {
+                       return nil, errors.New("Not support " + 
stateMachineJsonObject.RecoverStrategy)
+               }
+               stateMachine.SetRecoverStrategy(recoverStrategy)
+       }
+
+       stateParserFactory := NewDefaultStateParserFactory()
+       stateParserFactory.InitDefaultStateParser()
+       for stateName, v := range stateMachineJsonObject.States {
+               stateMap, ok := v.(map[string]interface{})
+               if !ok {
+                       return nil, errors.New("State [" + stateName + "] 
scheme illegal, required map")
+               }
+
+               stateType, ok := stateMap["Type"].(string)
+               if !ok {
+                       return nil, errors.New("State [" + stateName + "] Type 
illegal, required string")
+               }
+
+               //stateMap
+               stateParser := stateParserFactory.GetStateParser(stateType)
+               if stateParser == nil {
+                       return nil, errors.New("State Type [" + stateType + "] 
is not support")
+               }
+
+               _, stateExist := stateMachine.States()[stateName]
+               if stateExist {
+                       return nil, errors.New("State [name:" + stateName + "] 
already exists")
+               }
+
+               state, err := stateParser.Parse(stateName, stateMap)
+               if err != nil {
+                       return nil, err
+               }
+
+               state.SetStateMachine(stateMachine)
+               stateMachine.States()[stateName] = state
+       }
+
+       //TODO setCompensateState
+       //for stateName, state := range stateMachine.GetStates() {
+       //
+       //}
+       //
+
+       return stateMachine, nil
+}
+
+type StateMachineJsonObject struct {
+       Name                        string                 `json:"Name"`
+       Comment                     string                 `json:"Comment"`
+       Version                     string                 `json:"Version"`
+       StartState                  string                 `json:"StartState"`
+       RecoverStrategy             string                 
`json:"RecoverStrategy"`
+       Persist                     bool                   `json:"IsPersist"`
+       RetryPersistModeUpdate      bool                   
`json:"IsRetryPersistModeUpdate"`
+       CompensatePersistModeUpdate bool                   
`json:"IsCompensatePersistModeUpdate"`
+       Type                        string                 `json:"Type"`
+       States                      map[string]interface{} `json:"States"`
+}
diff --git 
a/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go 
b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go
new file mode 100644
index 00000000..c6d00b75
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go
@@ -0,0 +1,13 @@
+package parser
+
+import (
+       "testing"
+)
+
+func TestParseChoice(t *testing.T) {
+       var content = "{\n    \"Name\":\"ChoiceTest\",\n    
\"Comment\":\"ChoiceTest\",\n    \"StartState\":\"ChoiceState\",\n    
\"Version\":\"0.0.1\",\n    \"States\":{\n        \"ChoiceState\":{\n           
 \"Type\":\"Choice\",\n            \"Choices\":[\n                {\n           
         \"Expression\":\"[a] == 1\",\n                    
\"Next\":\"SecondState\"\n                },\n                {\n               
     \"Expression\":\"[a] == 2\",\n                    \"Next\":\"Thir [...]
+       _, err := NewJSONStateMachineParser().Parse(content)
+       if err != nil {
+               t.Error("parse fail: " + err.Error())
+       }
+}
diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_parser.go 
b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go
new file mode 100644
index 00000000..99ee95e1
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go
@@ -0,0 +1,104 @@
+package parser
+
+import (
+       "github.com/pkg/errors"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       "sync"
+)
+
+type StateMachineParser interface {
+       GetType() string
+       Parse(content string) (statelang.StateMachine, error)
+}
+
+type StateParser interface {
+       StateType() string
+       Parse(stateName string, stateMap map[string]interface{}) 
(statelang.State, error)
+}
+
+type BaseStateParser struct {
+}
+
+func (b BaseStateParser) ParseBaseAttributes(stateName string, state 
statelang.State, stateMap map[string]interface{}) error {
+       state.SetName(stateName)
+
+       comment, err := b.GetString(stateName, stateMap, "Comment")
+       if err != nil {
+               return err
+       }
+       state.SetComment(comment)
+
+       next, err := b.GetString(stateName, stateMap, "Next")
+       if err != nil {
+               return err
+       }
+
+       state.SetNext(next)
+       return nil
+}
+
+func (b BaseStateParser) GetString(stateName string, stateMap 
map[string]interface{}, key string) (string, error) {
+       value := stateMap[key]
+       if value == nil {
+               var result string
+               return result, errors.New("State [" + stateName + "] " + key + 
" not exist")
+       }
+
+       valueAsString, ok := value.(string)
+       if !ok {
+               var s string
+               return s, errors.New("State [" + stateName + "] " + key + " 
illegal, required string")
+       }
+       return valueAsString, nil
+}
+
+func (b BaseStateParser) GetSlice(stateName string, stateMap 
map[string]interface{}, key string) ([]interface{}, error) {
+       value := stateMap[key]
+
+       if value == nil {
+               var result []interface{}
+               return result, errors.New("State [" + stateName + "] " + key + 
" not exist")
+       }
+
+       valueAsSlice, ok := value.([]interface{})
+       if !ok {
+               var result []interface{}
+               return result, errors.New("State [" + stateName + "] " + key + 
" illegal, required slice")
+       }
+       return valueAsSlice, nil
+}
+
+type StateParserFactory interface {
+       RegistryStateParser(stateType string, stateParser StateParser)
+
+       GetStateParser(stateType string) StateParser
+}
+
+type DefaultStateParserFactory struct {
+       stateParserMap map[string]StateParser
+       mutex          sync.Mutex
+}
+
+func NewDefaultStateParserFactory() *DefaultStateParserFactory {
+       var stateParserMap map[string]StateParser = make(map[string]StateParser)
+       return &DefaultStateParserFactory{
+               stateParserMap: stateParserMap,
+       }
+}
+
+// InitDefaultStateParser init StateParser by default
+func (d *DefaultStateParserFactory) InitDefaultStateParser() {
+       choiceStateParser := NewChoiceStateParser()
+
+       d.RegistryStateParser(choiceStateParser.StateType(), choiceStateParser)
+}
+
+func (d *DefaultStateParserFactory) RegistryStateParser(stateType string, 
stateParser StateParser) {
+       d.mutex.Lock()
+       defer d.mutex.Unlock()
+       d.stateParserMap[stateType] = stateParser
+}
+
+func (d *DefaultStateParserFactory) GetStateParser(stateType string) 
StateParser {
+       return d.stateParserMap[stateType]
+}
diff --git a/pkg/saga/statemachine/statelang/state.go 
b/pkg/saga/statemachine/statelang/state.go
new file mode 100644
index 00000000..1e490ac0
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state.go
@@ -0,0 +1,71 @@
+package statelang
+
+type State interface {
+       Name() string
+
+       SetName(name string)
+
+       Comment() string
+
+       SetComment(comment string)
+
+       Type() string
+
+       SetType(typeName string)
+
+       Next() string
+
+       SetNext(next string)
+
+       StateMachine() StateMachine
+
+       SetStateMachine(machine StateMachine)
+}
+
+type BaseState struct {
+       name         string `alias:"Name"`
+       comment      string `alias:"Comment"`
+       typeName     string `alias:"Type"`
+       next         string `alias:"Next"`
+       stateMachine StateMachine
+}
+
+func (b *BaseState) Name() string {
+       return b.name
+}
+
+func (b *BaseState) SetName(name string) {
+       b.name = name
+}
+
+func (b *BaseState) Comment() string {
+       return b.comment
+}
+
+func (b *BaseState) SetComment(comment string) {
+       b.comment = comment
+}
+
+func (b *BaseState) Type() string {
+       return b.typeName
+}
+
+func (b *BaseState) SetType(typeName string) {
+       b.typeName = typeName
+}
+
+func (b *BaseState) Next() string {
+       return b.next
+}
+
+func (b *BaseState) SetNext(next string) {
+       b.next = next
+}
+
+func (b *BaseState) StateMachine() StateMachine {
+       return b.stateMachine
+}
+
+func (b *BaseState) SetStateMachine(machine StateMachine) {
+       b.stateMachine = machine
+}
diff --git a/pkg/saga/statemachine/statelang/state/choice_state.go 
b/pkg/saga/statemachine/statelang/state/choice_state.go
new file mode 100644
index 00000000..e1cd973c
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state/choice_state.go
@@ -0,0 +1,74 @@
+package state
+
+import "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+
+type ChoiceState interface {
+       statelang.State
+
+       Choices() []Choice
+
+       Default() string
+}
+
+type Choice interface {
+       Expression() string
+
+       SetExpression(expression string)
+
+       Next() string
+
+       SetNext(next string)
+}
+
+type ChoiceStateImpl struct {
+       statelang.BaseState
+       defaultChoice string   `alias:"Default"`
+       choices       []Choice `alias:"Choices"`
+}
+
+func NewChoiceStateImpl() *ChoiceStateImpl {
+       return &ChoiceStateImpl{
+               choices: make([]Choice, 0),
+       }
+}
+
+func (choiceState *ChoiceStateImpl) Default() string {
+       return choiceState.defaultChoice
+}
+
+func (choiceState *ChoiceStateImpl) Choices() []Choice {
+       return choiceState.choices
+}
+
+func (choiceState *ChoiceStateImpl) SetDefault(defaultChoice string) {
+       choiceState.defaultChoice = defaultChoice
+}
+
+func (choiceState *ChoiceStateImpl) SetChoices(choices []Choice) {
+       choiceState.choices = choices
+}
+
+type ChoiceImpl struct {
+       expression string
+       next       string
+}
+
+func NewChoiceImpl() *ChoiceImpl {
+       return &ChoiceImpl{}
+}
+
+func (c *ChoiceImpl) Expression() string {
+       return c.expression
+}
+
+func (c *ChoiceImpl) SetExpression(expression string) {
+       c.expression = expression
+}
+
+func (c *ChoiceImpl) Next() string {
+       return c.next
+}
+
+func (c *ChoiceImpl) SetNext(next string) {
+       c.next = next
+}
diff --git a/pkg/saga/statemachine/statelang/state/task_state.go 
b/pkg/saga/statemachine/statelang/state/task_state.go
new file mode 100644
index 00000000..a9164d35
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state/task_state.go
@@ -0,0 +1,30 @@
+package state
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+)
+
+type TaskState interface {
+       statelang.State
+
+       CompensateState() string
+
+       Status() map[string]string
+
+       Retry() []Retry
+}
+
+type Retry interface {
+       ErrorTypeNames() []string
+
+       IntervalSecond() float64
+
+       MaxAttempt() int
+
+       BackoffRate() float64
+}
+
+type ServiceTaskState interface {
+       TaskState
+       //TODO add serviceTask
+}
diff --git a/pkg/saga/statemachine/statelang/state_instance.go 
b/pkg/saga/statemachine/statelang/state_instance.go
new file mode 100644
index 00000000..826faca6
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/state_instance.go
@@ -0,0 +1,330 @@
+package statelang
+
+import "time"
+
+type StateInstance interface {
+       ID() string
+
+       SetID(id string)
+
+       Name() string
+
+       SetName(name string)
+
+       Type() string
+
+       SetType(typeName string)
+
+       ServiceName() string
+
+       SetServiceName(serviceName string)
+
+       ServiceMethod() string
+
+       SetServiceMethod(serviceMethod string)
+
+       ServiceType() string
+
+       SetServiceType(serviceType string)
+
+       BusinessKey() string
+
+       SetBusinessKey(businessKey string)
+
+       StartedTime() time.Time
+
+       SetStartedTime(startedTime time.Time)
+
+       UpdatedTime() time.Time
+
+       SetUpdatedTime(updateTime time.Time)
+
+       EndTime() time.Time
+
+       SetEndTime(endTime time.Time)
+
+       IsForUpdate() bool
+
+       SetForUpdate(forUpdate bool)
+
+       Error() error
+
+       SetError(err error)
+
+       InputParams() interface{}
+
+       SetInputParams(inputParams interface{})
+
+       OutputParams() interface{}
+
+       SetOutputParams(outputParams interface{})
+
+       Status() ExecutionStatus
+
+       SetStatus(status ExecutionStatus)
+
+       StateIDCompensatedFor() string
+
+       SetStateIDCompensatedFor(stateIdCompensatedFor string)
+
+       StateIDRetriedFor() string
+
+       SetStateIDRetriedFor(stateIdRetriedFor string)
+
+       CompensationState() StateInstance
+
+       SetCompensationState(compensationState StateInstance)
+
+       StateMachineInstance() StateMachineInstance
+
+       SetStateMachineInstance(stateMachineInstance StateMachineInstance)
+
+       IsIgnoreStatus() bool
+
+       SetIgnoreStatus(ignoreStatus bool)
+
+       IsForCompensation() bool
+
+       SerializedInputParams() interface{}
+
+       SetSerializedInputParams(serializedInputParams interface{})
+
+       SerializedOutputParams() interface{}
+
+       SetSerializedOutputParams(serializedOutputParams interface{})
+
+       SerializedError() interface{}
+
+       SetSerializedError(serializedErr interface{})
+
+       CompensationStatus() ExecutionStatus
+}
+
+type StateInstanceImpl struct {
+       id                     string
+       machineInstanceId      string
+       name                   string
+       typeName               string
+       serviceName            string
+       serviceMethod          string
+       serviceType            string
+       businessKey            string
+       startedTime            time.Time
+       updatedTime            time.Time
+       endTime                time.Time
+       isForUpdate            bool
+       err                    error
+       serializedErr          interface{}
+       inputParams            interface{}
+       serializedInputParams  interface{}
+       outputParams           interface{}
+       serializedOutputParams interface{}
+       status                 ExecutionStatus
+       stateIdCompensatedFor  string
+       stateIdRetriedFor      string
+       compensationState      StateInstance
+       stateMachineInstance   StateMachineInstance
+       ignoreStatus           bool
+}
+
+func NewStateInstanceImpl() *StateInstanceImpl {
+       return &StateInstanceImpl{}
+}
+
+func (s *StateInstanceImpl) ID() string {
+       return s.id
+}
+
+func (s *StateInstanceImpl) SetID(id string) {
+       s.id = id
+}
+
+func (s *StateInstanceImpl) Name() string {
+       return s.name
+}
+
+func (s *StateInstanceImpl) SetName(name string) {
+       s.name = name
+}
+
+func (s *StateInstanceImpl) Type() string {
+       return s.typeName
+}
+
+func (s *StateInstanceImpl) SetType(typeName string) {
+       s.typeName = typeName
+}
+
+func (s *StateInstanceImpl) ServiceName() string {
+       return s.serviceName
+}
+
+func (s *StateInstanceImpl) SetServiceName(serviceName string) {
+       s.serviceName = serviceName
+}
+
+func (s *StateInstanceImpl) ServiceMethod() string {
+       return s.serviceMethod
+}
+
+func (s *StateInstanceImpl) SetServiceMethod(serviceMethod string) {
+       s.serviceMethod = serviceMethod
+}
+
+func (s *StateInstanceImpl) ServiceType() string {
+       return s.serviceType
+}
+
+func (s *StateInstanceImpl) SetServiceType(serviceType string) {
+       s.serviceType = serviceType
+}
+
+func (s *StateInstanceImpl) BusinessKey() string {
+       return s.businessKey
+}
+
+func (s *StateInstanceImpl) SetBusinessKey(businessKey string) {
+       s.businessKey = businessKey
+}
+
+func (s *StateInstanceImpl) StartedTime() time.Time {
+       return s.startedTime
+}
+
+func (s *StateInstanceImpl) SetStartedTime(startedTime time.Time) {
+       s.startedTime = startedTime
+}
+
+func (s *StateInstanceImpl) UpdatedTime() time.Time {
+       return s.updatedTime
+}
+
+func (s *StateInstanceImpl) SetUpdatedTime(updatedTime time.Time) {
+       s.updatedTime = updatedTime
+}
+
+func (s *StateInstanceImpl) EndTime() time.Time {
+       return s.endTime
+}
+
+func (s *StateInstanceImpl) SetEndTime(endTime time.Time) {
+       s.endTime = endTime
+}
+
+func (s *StateInstanceImpl) IsForUpdate() bool {
+       return s.isForUpdate
+}
+
+func (s *StateInstanceImpl) SetForUpdate(forUpdate bool) {
+       s.isForUpdate = forUpdate
+}
+
+func (s *StateInstanceImpl) Error() error {
+       return s.err
+}
+
+func (s *StateInstanceImpl) SetError(err error) {
+       s.err = err
+}
+
+func (s *StateInstanceImpl) InputParams() interface{} {
+       return s.inputParams
+}
+
+func (s *StateInstanceImpl) SetInputParams(inputParams interface{}) {
+       s.inputParams = inputParams
+}
+
+func (s *StateInstanceImpl) OutputParams() interface{} {
+       return s.outputParams
+}
+
+func (s *StateInstanceImpl) SetOutputParams(outputParams interface{}) {
+       s.outputParams = outputParams
+}
+
+func (s *StateInstanceImpl) Status() ExecutionStatus {
+       return s.status
+}
+
+func (s *StateInstanceImpl) SetStatus(status ExecutionStatus) {
+       s.status = status
+}
+
+func (s *StateInstanceImpl) StateIDCompensatedFor() string {
+       return s.stateIdCompensatedFor
+}
+
+func (s *StateInstanceImpl) SetStateIDCompensatedFor(stateIdCompensatedFor 
string) {
+       s.stateIdCompensatedFor = stateIdCompensatedFor
+}
+
+func (s *StateInstanceImpl) StateIDRetriedFor() string {
+       return s.stateIdRetriedFor
+}
+
+func (s *StateInstanceImpl) SetStateIDRetriedFor(stateIdRetriedFor string) {
+       s.stateIdRetriedFor = stateIdRetriedFor
+}
+
+func (s *StateInstanceImpl) CompensationState() StateInstance {
+       return s.compensationState
+}
+
+func (s *StateInstanceImpl) SetCompensationState(compensationState 
StateInstance) {
+       s.compensationState = compensationState
+}
+
+func (s *StateInstanceImpl) StateMachineInstance() StateMachineInstance {
+       return s.stateMachineInstance
+}
+
+func (s *StateInstanceImpl) SetStateMachineInstance(stateMachineInstance 
StateMachineInstance) {
+       s.stateMachineInstance = stateMachineInstance
+}
+
+func (s *StateInstanceImpl) IsIgnoreStatus() bool {
+       return s.ignoreStatus
+}
+
+func (s *StateInstanceImpl) SetIgnoreStatus(ignoreStatus bool) {
+       s.ignoreStatus = ignoreStatus
+}
+
+func (s *StateInstanceImpl) IsForCompensation() bool {
+       return s.stateIdCompensatedFor == ""
+}
+
+func (s *StateInstanceImpl) SerializedInputParams() interface{} {
+       return s.serializedInputParams
+}
+
+func (s *StateInstanceImpl) SetSerializedInputParams(serializedInputParams 
interface{}) {
+       s.serializedInputParams = serializedInputParams
+}
+
+func (s *StateInstanceImpl) SerializedOutputParams() interface{} {
+       return s.serializedOutputParams
+}
+
+func (s *StateInstanceImpl) SetSerializedOutputParams(serializedOutputParams 
interface{}) {
+       s.serializedOutputParams = serializedOutputParams
+}
+
+func (s *StateInstanceImpl) SerializedError() interface{} {
+       return s.serializedErr
+}
+
+func (s *StateInstanceImpl) SetSerializedError(serializedErr interface{}) {
+       s.serializedErr = serializedErr
+}
+
+func (s *StateInstanceImpl) CompensationStatus() ExecutionStatus {
+       if s.compensationState != nil {
+               return s.compensationState.Status()
+       }
+
+       //return nil ExecutionStatus
+       var status ExecutionStatus
+       return status
+}
diff --git a/pkg/saga/statemachine/statelang/statemachine.go 
b/pkg/saga/statemachine/statelang/statemachine.go
new file mode 100644
index 00000000..8b13f8d0
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/statemachine.go
@@ -0,0 +1,261 @@
+package statelang
+
+import (
+       "time"
+)
+
+type StateMachineStatus string
+
+const (
+       Active   StateMachineStatus = "Active"
+       Inactive StateMachineStatus = "Inactive"
+)
+
+// RecoverStrategy : Recover Strategy
+type RecoverStrategy string
+
+const (
+       //Compensate stateMachine
+       Compensate RecoverStrategy = "Compensate"
+       // Forward  stateMachine
+       Forward RecoverStrategy = "Forward"
+)
+
+func ValueOfRecoverStrategy(recoverStrategy string) (RecoverStrategy, bool) {
+       switch recoverStrategy {
+       case "Compensate":
+               return Compensate, true
+       case "Forward":
+               return Forward, true
+       default:
+               var recoverStrategy RecoverStrategy
+               return recoverStrategy, false
+       }
+}
+
+type StateMachine interface {
+       ID() string
+
+       SetID(id string)
+
+       Name() string
+
+       SetName(name string)
+
+       Comment() string
+
+       SetComment(comment string)
+
+       StartState() string
+
+       SetStartState(startState string)
+
+       Version() string
+
+       SetVersion(version string)
+
+       States() map[string]State
+
+       State(stateName string) State
+
+       TenantId() string
+
+       SetTenantId(tenantId string)
+
+       AppName() string
+
+       SetAppName(appName string)
+
+       Type() string
+
+       SetType(typeName string)
+
+       Status() StateMachineStatus
+
+       SetStatus(status StateMachineStatus)
+
+       RecoverStrategy() RecoverStrategy
+
+       SetRecoverStrategy(recoverStrategy RecoverStrategy)
+
+       IsPersist() bool
+
+       SetPersist(persist bool)
+
+       IsRetryPersistModeUpdate() bool
+
+       SetRetryPersistModeUpdate(retryPersistModeUpdate bool)
+
+       IsCompensatePersistModeUpdate() bool
+
+       SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool)
+
+       Content() string
+
+       SetContent(content string)
+
+       CreateTime() time.Time
+
+       SetCreateTime(createTime time.Time)
+}
+
+type StateMachineImpl struct {
+       id                          string
+       tenantId                    string
+       appName                     string
+       name                        string
+       comment                     string
+       version                     string
+       startState                  string
+       status                      StateMachineStatus
+       recoverStrategy             RecoverStrategy
+       persist                     bool
+       retryPersistModeUpdate      bool
+       compensatePersistModeUpdate bool
+       typeName                    string
+       content                     string
+       createTime                  time.Time
+       states                      map[string]State
+}
+
+func NewStateMachineImpl() *StateMachineImpl {
+       stateMap := make(map[string]State)
+       return &StateMachineImpl{
+               appName:  "SEATA",
+               status:   Active,
+               typeName: "STATE_LANG",
+               states:   stateMap,
+       }
+}
+
+func (s *StateMachineImpl) ID() string {
+       return s.id
+}
+
+func (s *StateMachineImpl) SetID(id string) {
+       s.id = id
+}
+
+func (s *StateMachineImpl) Name() string {
+       return s.name
+}
+
+func (s *StateMachineImpl) SetName(name string) {
+       s.name = name
+}
+
+func (s *StateMachineImpl) SetComment(comment string) {
+       s.comment = comment
+}
+
+func (s *StateMachineImpl) Comment() string {
+       return s.comment
+}
+
+func (s *StateMachineImpl) StartState() string {
+       return s.startState
+}
+
+func (s *StateMachineImpl) SetStartState(startState string) {
+       s.startState = startState
+}
+
+func (s *StateMachineImpl) Version() string {
+       return s.version
+}
+
+func (s *StateMachineImpl) SetVersion(version string) {
+       s.version = version
+}
+
+func (s *StateMachineImpl) States() map[string]State {
+       return s.states
+}
+
+func (s *StateMachineImpl) State(stateName string) State {
+       if s.states == nil {
+               return nil
+       }
+
+       return s.states[stateName]
+}
+
+func (s *StateMachineImpl) TenantId() string {
+       return s.tenantId
+}
+
+func (s *StateMachineImpl) SetTenantId(tenantId string) {
+       s.tenantId = tenantId
+}
+
+func (s *StateMachineImpl) AppName() string {
+       return s.appName
+}
+
+func (s *StateMachineImpl) SetAppName(appName string) {
+       s.appName = appName
+}
+
+func (s *StateMachineImpl) Type() string {
+       return s.typeName
+}
+
+func (s *StateMachineImpl) SetType(typeName string) {
+       s.typeName = typeName
+}
+
+func (s *StateMachineImpl) Status() StateMachineStatus {
+       return s.status
+}
+
+func (s *StateMachineImpl) SetStatus(status StateMachineStatus) {
+       s.status = status
+}
+
+func (s *StateMachineImpl) RecoverStrategy() RecoverStrategy {
+       return s.recoverStrategy
+}
+
+func (s *StateMachineImpl) SetRecoverStrategy(recoverStrategy RecoverStrategy) 
{
+       s.recoverStrategy = recoverStrategy
+}
+
+func (s *StateMachineImpl) IsPersist() bool {
+       return s.persist
+}
+
+func (s *StateMachineImpl) SetPersist(persist bool) {
+       s.persist = persist
+}
+
+func (s *StateMachineImpl) IsRetryPersistModeUpdate() bool {
+       return s.retryPersistModeUpdate
+}
+
+func (s *StateMachineImpl) SetRetryPersistModeUpdate(retryPersistModeUpdate 
bool) {
+       s.retryPersistModeUpdate = retryPersistModeUpdate
+}
+
+func (s *StateMachineImpl) IsCompensatePersistModeUpdate() bool {
+       return s.compensatePersistModeUpdate
+}
+
+func (s *StateMachineImpl) 
SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool) {
+       s.compensatePersistModeUpdate = compensatePersistModeUpdate
+}
+
+func (s *StateMachineImpl) Content() string {
+       return s.content
+}
+
+func (s *StateMachineImpl) SetContent(content string) {
+       s.content = content
+}
+
+func (s *StateMachineImpl) CreateTime() time.Time {
+       return s.createTime
+}
+
+func (s *StateMachineImpl) SetCreateTime(createTime time.Time) {
+       s.createTime = createTime
+}
diff --git a/pkg/saga/statemachine/statelang/statemachine_instance.go 
b/pkg/saga/statemachine/statelang/statemachine_instance.go
new file mode 100644
index 00000000..ff039024
--- /dev/null
+++ b/pkg/saga/statemachine/statelang/statemachine_instance.go
@@ -0,0 +1,316 @@
+package statelang
+
+import (
+       "sync"
+       "time"
+)
+
+type ExecutionStatus string
+
+const (
+       // RU Running
+       RU ExecutionStatus = "RU"
+       // SU Succeed
+       SU ExecutionStatus = "SU"
+       // FA Failed
+       FA ExecutionStatus = "FA"
+       // UN Unknown
+       UN ExecutionStatus = "UN"
+       // SK Skipped
+       SK ExecutionStatus = "SK"
+)
+
+type StateMachineInstance interface {
+       ID() string
+
+       SetID(id string)
+
+       MachineID() string
+
+       SetMachineID(machineID string)
+
+       TenantID() string
+
+       SetTenantID(tenantID string)
+
+       ParentID() string
+
+       SetParentID(parentID string)
+
+       StartedTime() time.Time
+
+       SetStartedTime(startedTime time.Time)
+
+       EndTime() time.Time
+
+       SetEndTime(endTime time.Time)
+
+       StateList() []StateInstance
+
+       State(stateId string) StateInstance
+
+       PutState(stateId string, stateInstance StateInstance)
+
+       Status() ExecutionStatus
+
+       SetStatus(status ExecutionStatus)
+
+       CompensationStatus() ExecutionStatus
+
+       SetCompensationStatus(compensationStatus ExecutionStatus)
+
+       IsRunning() bool
+
+       SetRunning(isRunning bool)
+
+       UpdatedTime() time.Time
+
+       SetUpdatedTime(updatedTime time.Time)
+
+       BusinessKey() string
+
+       SetBusinessKey(businessKey string)
+
+       Error() error
+
+       SetError(err error)
+
+       StartParams() map[string]interface{}
+
+       SetStartParams(startParams map[string]interface{})
+
+       EndParams() map[string]interface{}
+
+       SetEndParams(endParams map[string]interface{})
+
+       PutContext(key string, value interface{})
+
+       SetContext(context map[string]interface{})
+
+       StateMachine() StateMachine
+
+       SetStateMachine(stateMachine StateMachine)
+
+       SerializedStartParams() interface{}
+
+       SetSerializedStartParams(serializedStartParams interface{})
+
+       SerializedEndParams() interface{}
+
+       SetSerializedEndParams(serializedEndParams interface{})
+
+       SerializedError() interface{}
+
+       SetSerializedError(serializedError interface{})
+}
+
+type StateMachineInstanceImpl struct {
+       id                    string
+       machineId             string
+       tenantId              string
+       parentId              string
+       businessKey           string
+       startParams           map[string]interface{}
+       serializedStartParams interface{}
+       startedTime           time.Time
+       endTime               time.Time
+       updatedTime           time.Time
+       err                   error
+       serializedError       interface{}
+       endParams             map[string]interface{}
+       serializedEndParams   interface{}
+       status                ExecutionStatus
+       compensationStatus    ExecutionStatus
+       isRunning             bool
+       context               map[string]interface{}
+       stateMachine          StateMachine
+       stateList             []StateInstance
+       stateMap              map[string]StateInstance
+
+       contextMutex sync.RWMutex // Mutex to protect concurrent access to 
context
+       stateMutex   sync.RWMutex // Mutex to protect concurrent access to 
stateList and stateMap
+}
+
+func NewStateMachineInstanceImpl() *StateMachineInstanceImpl {
+       return &StateMachineInstanceImpl{
+               startParams: make(map[string]interface{}),
+               endParams:   make(map[string]interface{}),
+               stateList:   make([]StateInstance, 0),
+               stateMap:    make(map[string]StateInstance)}
+}
+
+func (s *StateMachineInstanceImpl) ID() string {
+       return s.id
+}
+
+func (s *StateMachineInstanceImpl) SetID(id string) {
+       s.id = id
+}
+
+func (s *StateMachineInstanceImpl) MachineID() string {
+       return s.machineId
+}
+
+func (s *StateMachineInstanceImpl) SetMachineID(machineID string) {
+       s.machineId = machineID
+}
+
+func (s *StateMachineInstanceImpl) TenantID() string {
+       return s.tenantId
+}
+
+func (s *StateMachineInstanceImpl) SetTenantID(tenantID string) {
+       s.tenantId = tenantID
+}
+
+func (s *StateMachineInstanceImpl) ParentID() string {
+       return s.parentId
+}
+
+func (s *StateMachineInstanceImpl) SetParentID(parentID string) {
+       s.parentId = parentID
+}
+
+func (s *StateMachineInstanceImpl) StartedTime() time.Time {
+       return s.startedTime
+}
+
+func (s *StateMachineInstanceImpl) SetStartedTime(startedTime time.Time) {
+       s.startedTime = startedTime
+}
+
+func (s *StateMachineInstanceImpl) EndTime() time.Time {
+       return s.endTime
+}
+
+func (s *StateMachineInstanceImpl) SetEndTime(endTime time.Time) {
+       s.endTime = endTime
+}
+
+func (s *StateMachineInstanceImpl) StateList() []StateInstance {
+       return s.stateList
+}
+
+func (s *StateMachineInstanceImpl) State(stateId string) StateInstance {
+       s.stateMutex.RLock()
+       defer s.stateMutex.RUnlock()
+
+       return s.stateMap[stateId]
+}
+
+func (s *StateMachineInstanceImpl) PutState(stateId string, stateInstance 
StateInstance) {
+       s.stateMutex.Lock()
+       defer s.stateMutex.Unlock()
+
+       stateInstance.SetStateMachineInstance(s)
+       s.stateMap[stateId] = stateInstance
+       s.stateList = append(s.stateList, stateInstance)
+}
+
+func (s *StateMachineInstanceImpl) Status() ExecutionStatus {
+       return s.status
+}
+
+func (s *StateMachineInstanceImpl) SetStatus(status ExecutionStatus) {
+       s.status = status
+}
+
+func (s *StateMachineInstanceImpl) CompensationStatus() ExecutionStatus {
+       return s.compensationStatus
+}
+
+func (s *StateMachineInstanceImpl) SetCompensationStatus(compensationStatus 
ExecutionStatus) {
+       s.compensationStatus = compensationStatus
+}
+
+func (s *StateMachineInstanceImpl) IsRunning() bool {
+       return s.isRunning
+}
+
+func (s *StateMachineInstanceImpl) SetRunning(isRunning bool) {
+       s.isRunning = isRunning
+}
+
+func (s *StateMachineInstanceImpl) UpdatedTime() time.Time {
+       return s.updatedTime
+}
+
+func (s *StateMachineInstanceImpl) SetUpdatedTime(updatedTime time.Time) {
+       s.updatedTime = updatedTime
+}
+
+func (s *StateMachineInstanceImpl) BusinessKey() string {
+       return s.businessKey
+}
+
+func (s *StateMachineInstanceImpl) SetBusinessKey(businessKey string) {
+       s.businessKey = businessKey
+}
+
+func (s *StateMachineInstanceImpl) Error() error {
+       return s.err
+}
+
+func (s *StateMachineInstanceImpl) SetError(err error) {
+       s.err = err
+}
+
+func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} {
+       return s.startParams
+}
+
+func (s *StateMachineInstanceImpl) SetStartParams(startParams 
map[string]interface{}) {
+       s.startParams = startParams
+}
+
+func (s *StateMachineInstanceImpl) EndParams() map[string]interface{} {
+       return s.endParams
+}
+
+func (s *StateMachineInstanceImpl) SetEndParams(endParams 
map[string]interface{}) {
+       s.endParams = endParams
+}
+
+func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) {
+       s.contextMutex.Lock()
+       defer s.contextMutex.Unlock()
+
+       s.context[key] = value
+}
+
+func (s *StateMachineInstanceImpl) SetContext(context map[string]interface{}) {
+       s.context = context
+}
+
+func (s *StateMachineInstanceImpl) StateMachine() StateMachine {
+       return s.stateMachine
+}
+
+func (s *StateMachineInstanceImpl) SetStateMachine(stateMachine StateMachine) {
+       s.stateMachine = stateMachine
+       s.machineId = stateMachine.ID()
+}
+
+func (s *StateMachineInstanceImpl) SerializedStartParams() interface{} {
+       return s.serializedStartParams
+}
+
+func (s *StateMachineInstanceImpl) 
SetSerializedStartParams(serializedStartParams interface{}) {
+       s.serializedStartParams = serializedStartParams
+}
+
+func (s *StateMachineInstanceImpl) SerializedEndParams() interface{} {
+       return s.endParams
+}
+
+func (s *StateMachineInstanceImpl) SetSerializedEndParams(serializedEndParams 
interface{}) {
+       s.serializedEndParams = serializedEndParams
+}
+
+func (s *StateMachineInstanceImpl) SerializedError() interface{} {
+       return s.serializedError
+}
+
+func (s *StateMachineInstanceImpl) SetSerializedError(serializedError 
interface{}) {
+       s.serializedError = serializedError
+}
diff --git a/pkg/util/reflectx/unmarkshaler.go 
b/pkg/util/reflectx/unmarkshaler.go
new file mode 100644
index 00000000..73acb276
--- /dev/null
+++ b/pkg/util/reflectx/unmarkshaler.go
@@ -0,0 +1,147 @@
+package reflectx
+
+import (
+       "fmt"
+       "github.com/pkg/errors"
+       "reflect"
+       "unicode"
+)
+
+// MapToStruct some state can use this util to parse
+// TODO 性能测试,性能差的话,直接去解析,不使用反射
+func MapToStruct(stateName string, obj interface{}, stateMap 
map[string]interface{}) error {
+       objVal := reflect.ValueOf(obj)
+       if objVal.Kind() != reflect.Pointer {
+               return errors.New(fmt.Sprintf("State [%s] value required a 
pointer", stateName))
+       }
+
+       structValue := objVal.Elem()
+       if structValue.Kind() != reflect.Struct {
+               return errors.New(fmt.Sprintf("State [%s] value elem required a 
struct", stateName))
+       }
+
+       structType := structValue.Type()
+       for key, value := range stateMap {
+               //Get field, get alias first
+               field, found := getField(structType, key)
+               if !found {
+                       continue
+               }
+
+               fieldVal := structValue.FieldByName(field.Name)
+               if !fieldVal.IsValid() {
+                       return errors.New(fmt.Sprintf("State [%s] not support 
[%s] filed", stateName, key))
+               }
+
+               //Get setMethod
+               var setMethod reflect.Value
+               if !fieldVal.CanSet() {
+                       setMethod = getFiledSetMethod(field.Name, objVal)
+
+                       if !setMethod.IsValid() {
+                               fieldAliasName := field.Tag.Get("alias")
+                               setMethod = getFiledSetMethod(fieldAliasName, 
objVal)
+                       }
+
+                       if !setMethod.IsValid() {
+                               return errors.New(fmt.Sprintf("State [%s] [%s] 
field not support setMethod", stateName, key))
+                       }
+                       setMethodType := setMethod.Type()
+                       if !(setMethodType.NumIn() == 1 && setMethodType.In(0) 
== fieldVal.Type()) {
+                               return errors.New(fmt.Sprintf("State [%s] [%s] 
field setMethod illegal", stateName, key))
+                       }
+               }
+
+               val := reflect.ValueOf(value)
+               if fieldVal.Kind() == reflect.Struct {
+                       //map[string]interface{}
+                       if val.Kind() != reflect.Map {
+                               return errors.New(fmt.Sprintf("State [%s] [%s] 
field type required map", stateName, key))
+                       }
+
+                       err := MapToStruct(stateName, 
fieldVal.Addr().Interface(), value.(map[string]interface{}))
+                       if err != nil {
+                               return err
+                       }
+               } else if fieldVal.Kind() == reflect.Slice {
+                       if val.Kind() != reflect.Slice {
+                               return errors.New(fmt.Sprintf("State [%s] [%s] 
field type required slice", stateName, key))
+                       }
+
+                       sliceType := fieldVal.Type().Elem()
+                       newSlice := reflect.MakeSlice(fieldVal.Type(), 0, 
val.Len())
+
+                       for i := 0; i < val.Len(); i++ {
+                               newElem := reflect.New(sliceType.Elem())
+                               elemMap := 
val.Index(i).Interface().(map[string]interface{})
+                               err := MapToStruct(stateName, 
newElem.Interface(), elemMap)
+                               if err != nil {
+                                       return err
+                               }
+                               reflect.Append(newSlice, newElem.Elem())
+                       }
+                       setFiled(fieldVal, setMethod, newSlice)
+               } else if fieldVal.Kind() == reflect.Map {
+                       if val.Kind() != reflect.Map {
+                               return errors.New(fmt.Sprintf("State [%s] [%s] 
field type required map", stateName, key))
+                       }
+
+                       mapType := field.Type
+                       newMap := reflect.MakeMap(mapType)
+
+                       for _, key := range val.MapKeys() {
+                               newVal := reflect.New(mapType.Elem().Elem())
+                               elemMap := 
val.MapIndex(key).Interface().(map[string]interface{})
+                               err := MapToStruct(stateName, 
newVal.Interface(), elemMap)
+                               if err != nil {
+                                       return err
+                               }
+                               newMap.SetMapIndex(key, newVal.Elem())
+                       }
+                       setFiled(fieldVal, setMethod, newMap)
+               } else {
+                       setFiled(fieldVal, setMethod, val)
+               }
+       }
+       return nil
+}
+
+func getField(t reflect.Type, name string) (reflect.StructField, bool) {
+       for i := 0; i < t.NumField(); i++ {
+               field := t.Field(i)
+               tag, hasAliasTag := field.Tag.Lookup("alias")
+
+               if (hasAliasTag && tag == name) || (!hasAliasTag && field.Name 
== name) {
+                       return field, true
+               }
+
+               if field.Anonymous {
+                       embeddedField, ok := getField(field.Type, name)
+                       if ok {
+                               return embeddedField, true
+                       }
+               }
+       }
+
+       return reflect.StructField{}, false
+}
+
+func getFiledSetMethod(name string, structValue reflect.Value) reflect.Value {
+       fieldNameSlice := []rune(name)
+       fieldNameSlice[0] = unicode.ToUpper(fieldNameSlice[0])
+
+       setMethodName := "Set" + string(fieldNameSlice)
+
+       setMethod := structValue.MethodByName(setMethodName)
+       return setMethod
+}
+
+func setFiled(fieldVal reflect.Value, setMethod reflect.Value, val 
reflect.Value) {
+       if !fieldVal.CanSet() {
+               setMethod.Call([]reflect.Value{
+                       val,
+               })
+       } else {
+               fieldVal.Set(val)
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to