Code-Fight commented on code in PR #805:
URL: 
https://github.com/apache/incubator-seata-go/pull/805#discussion_r2120297732


##########
pkg/saga/statemachine/engine/core/default_statemachine_config.go:
##########
@@ -242,19 +277,417 @@ func (c *DefaultStateMachineConfig) 
SetRmReportSuccessEnable(rmReportSuccessEnab
        c.rmReportSuccessEnable = rmReportSuccessEnable
 }
 
-func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
+func (c *DefaultStateMachineConfig) GetStateMachineDefinition(name string) 
*statemachine.StateMachineObject {
+       return c.stateMachineDefs[name]
+}
+
+func (c *DefaultStateMachineConfig) GetExpressionFactory(expressionType 
string) expr.ExpressionFactory {
+       return c.expressionFactoryManager.GetExpressionFactory(expressionType)
+}
+
+func (c *DefaultStateMachineConfig) GetServiceInvoker(serviceType string) 
invoker.ServiceInvoker {
+       return c.serviceInvokerManager.ServiceInvoker(serviceType)
+}
+
+func (c *DefaultStateMachineConfig) RegisterStateMachineDef(resources 
[]string) error {
+       for _, resourcePath := range resources {
+               file, err := os.Open(resourcePath)
+               if err != nil {
+                       return fmt.Errorf("open resource file failed: path=%s, 
err=%w", resourcePath, err)
+               }
+               defer file.Close()
+
+               if err := 
c.stateMachineRepository.RegistryStateMachineByReader(file); err != nil {
+                       return fmt.Errorf("register state machine from file 
failed: path=%s, err=%w", resourcePath, err)
+               }
+       }
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) RegisterExpressionFactory(expressionType 
string, factory expr.ExpressionFactory) {
+       c.expressionFactoryManager.PutExpressionFactory(expressionType, factory)
+}
+
+func (c *DefaultStateMachineConfig) RegisterServiceInvoker(serviceType string, 
invoker invoker.ServiceInvoker) {
+       c.serviceInvokerManager.PutServiceInvoker(serviceType, invoker)
+}
+
+type ConfigFileParams struct {
+       TransOperationTimeout           int      
`json:"trans_operation_timeout" yaml:"trans_operation_timeout"`
+       ServiceInvokeTimeout            int      `json:"service_invoke_timeout" 
yaml:"service_invoke_timeout"`
+       Charset                         string   `json:"charset" yaml:"charset"`
+       DefaultTenantId                 string   `json:"default_tenant_id" 
yaml:"default_tenant_id"`
+       SagaRetryPersistModeUpdate      bool     
`json:"saga_retry_persist_mode_update" yaml:"saga_retry_persist_mode_update"`
+       SagaCompensatePersistModeUpdate bool     
`json:"saga_compensate_persist_mode_update" 
yaml:"saga_compensate_persist_mode_update"`
+       SagaBranchRegisterEnable        bool     
`json:"saga_branch_register_enable" yaml:"saga_branch_register_enable"`
+       RmReportSuccessEnable           bool     
`json:"rm_report_success_enable" yaml:"rm_report_success_enable"`
+       StateMachineResources           []string 
`json:"state_machine_resources" yaml:"state_machine_resources"`
+}
+
+type SequenceExpressionFactory struct {
+       seqGenerator sequence.SeqGenerator
+}
+
+var _ expr.ExpressionFactory = (*SequenceExpressionFactory)(nil)
+
+func NewSequenceExpressionFactory(seqGenerator sequence.SeqGenerator) 
*SequenceExpressionFactory {
+       return &SequenceExpressionFactory{
+               seqGenerator: seqGenerator,
+       }
+}
+
+func (f *SequenceExpressionFactory) CreateExpression(expression string) 
expr.Expression {
+       parts := strings.Split(expression, "|")
+       if len(parts) != 2 {
+               return &ErrorExpression{
+                       err:           fmt.Errorf("invalid sequence expression 
format: %s, expected 'entity|rule'", expression),
+                       expressionStr: expression,
+               }
+       }
+
+       seqExpr := &expr.SequenceExpression{}
+       seqExpr.SetSeqGenerator(f.seqGenerator)
+       seqExpr.SetEntity(strings.TrimSpace(parts[0]))
+       seqExpr.SetRule(strings.TrimSpace(parts[1]))
+
+       return seqExpr
+}
+
+type ErrorExpression struct {
+       err           error
+       expressionStr string
+}
+
+func (e *ErrorExpression) Value(elContext any) any {
+       return e.err
+}
+
+func (e *ErrorExpression) SetValue(value any, elContext any) {
+       //错误表达式不设置值

Review Comment:
   Why hasn’t this been implemented here? Why hasn’t this been implemented here?



##########
pkg/saga/statemachine/engine/invoker/invoker.go:
##########
@@ -19,11 +19,29 @@ package invoker
 
 import (
        "context"
-       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
+       "encoding/json"
+       "fmt"
        "reflect"
        "sync"
+
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
 )
 
+type JsonParser interface {

Review Comment:
   I understand that what’s needed here is just a Parser interface. Based on 
the abstraction of the parser, there can be different serialization formats, 
such as JSON, Protobuf, etc. However, since currently only the JSON format is 
used, this abstraction might not be necessary.
   
   我理解这里需要的是一个Parser的 
interface即可。基于parser的抽象,有不同的序列化方式,比如json,pb等。但是目前只有json的格式,所以这个抽象也许用不到



##########
pkg/saga/statemachine/engine/core/default_statemachine_config.go:
##########
@@ -242,19 +277,417 @@ func (c *DefaultStateMachineConfig) 
SetRmReportSuccessEnable(rmReportSuccessEnab
        c.rmReportSuccessEnable = rmReportSuccessEnable
 }
 
-func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
+func (c *DefaultStateMachineConfig) GetStateMachineDefinition(name string) 
*statemachine.StateMachineObject {
+       return c.stateMachineDefs[name]
+}
+
+func (c *DefaultStateMachineConfig) GetExpressionFactory(expressionType 
string) expr.ExpressionFactory {
+       return c.expressionFactoryManager.GetExpressionFactory(expressionType)
+}
+
+func (c *DefaultStateMachineConfig) GetServiceInvoker(serviceType string) 
invoker.ServiceInvoker {
+       return c.serviceInvokerManager.ServiceInvoker(serviceType)
+}
+
+func (c *DefaultStateMachineConfig) RegisterStateMachineDef(resources 
[]string) error {
+       for _, resourcePath := range resources {
+               file, err := os.Open(resourcePath)
+               if err != nil {
+                       return fmt.Errorf("open resource file failed: path=%s, 
err=%w", resourcePath, err)
+               }
+               defer file.Close()
+
+               if err := 
c.stateMachineRepository.RegistryStateMachineByReader(file); err != nil {
+                       return fmt.Errorf("register state machine from file 
failed: path=%s, err=%w", resourcePath, err)
+               }
+       }
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) RegisterExpressionFactory(expressionType 
string, factory expr.ExpressionFactory) {
+       c.expressionFactoryManager.PutExpressionFactory(expressionType, factory)
+}
+
+func (c *DefaultStateMachineConfig) RegisterServiceInvoker(serviceType string, 
invoker invoker.ServiceInvoker) {
+       c.serviceInvokerManager.PutServiceInvoker(serviceType, invoker)
+}
+
+type ConfigFileParams struct {
+       TransOperationTimeout           int      
`json:"trans_operation_timeout" yaml:"trans_operation_timeout"`
+       ServiceInvokeTimeout            int      `json:"service_invoke_timeout" 
yaml:"service_invoke_timeout"`
+       Charset                         string   `json:"charset" yaml:"charset"`
+       DefaultTenantId                 string   `json:"default_tenant_id" 
yaml:"default_tenant_id"`
+       SagaRetryPersistModeUpdate      bool     
`json:"saga_retry_persist_mode_update" yaml:"saga_retry_persist_mode_update"`
+       SagaCompensatePersistModeUpdate bool     
`json:"saga_compensate_persist_mode_update" 
yaml:"saga_compensate_persist_mode_update"`
+       SagaBranchRegisterEnable        bool     
`json:"saga_branch_register_enable" yaml:"saga_branch_register_enable"`
+       RmReportSuccessEnable           bool     
`json:"rm_report_success_enable" yaml:"rm_report_success_enable"`
+       StateMachineResources           []string 
`json:"state_machine_resources" yaml:"state_machine_resources"`
+}
+
+type SequenceExpressionFactory struct {
+       seqGenerator sequence.SeqGenerator
+}
+
+var _ expr.ExpressionFactory = (*SequenceExpressionFactory)(nil)
+
+func NewSequenceExpressionFactory(seqGenerator sequence.SeqGenerator) 
*SequenceExpressionFactory {
+       return &SequenceExpressionFactory{
+               seqGenerator: seqGenerator,
+       }
+}
+
+func (f *SequenceExpressionFactory) CreateExpression(expression string) 
expr.Expression {
+       parts := strings.Split(expression, "|")
+       if len(parts) != 2 {
+               return &ErrorExpression{
+                       err:           fmt.Errorf("invalid sequence expression 
format: %s, expected 'entity|rule'", expression),
+                       expressionStr: expression,
+               }
+       }
+
+       seqExpr := &expr.SequenceExpression{}
+       seqExpr.SetSeqGenerator(f.seqGenerator)
+       seqExpr.SetEntity(strings.TrimSpace(parts[0]))
+       seqExpr.SetRule(strings.TrimSpace(parts[1]))
+
+       return seqExpr
+}
+
+type ErrorExpression struct {
+       err           error
+       expressionStr string
+}
+
+func (e *ErrorExpression) Value(elContext any) any {
+       return e.err
+}
+
+func (e *ErrorExpression) SetValue(value any, elContext any) {
+       //错误表达式不设置值
+}
+
+func (e *ErrorExpression) ExpressionString() string {
+       return e.expressionStr
+}
+
+func (c *DefaultStateMachineConfig) LoadConfig(configPath string) error {
+       if c.seqGenerator == nil {
+               c.seqGenerator = sequence.NewUUIDSeqGenerator()
+       }
+
+       content, err := os.ReadFile(configPath)
+       if err != nil {
+               return fmt.Errorf("failed to read config file: path=%s, 
error=%w", configPath, err)
+       }
+
+       parser := parser.NewStateMachineConfigParser()
+       smo, err := parser.Parse(content)
+       if err != nil {
+               return fmt.Errorf("failed to parse state machine definition: 
path=%s, error=%w", configPath, err)
+       }
+
+       var configFileParams ConfigFileParams
+       if err := json.Unmarshal(content, &configFileParams); err != nil {
+               if err := yaml.Unmarshal(content, &configFileParams); err != 
nil {
+               } else {
+                       c.applyConfigFileParams(&configFileParams)
+               }
+       } else {
+               c.applyConfigFileParams(&configFileParams)
+       }
+
+       if _, exists := c.stateMachineDefs[smo.Name]; exists {
+               return fmt.Errorf("state machine definition with name %s 
already exists", smo.Name)
+       }
+       c.stateMachineDefs[smo.Name] = smo
+
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) applyConfigFileParams(rc 
*ConfigFileParams) {
+       if rc.TransOperationTimeout > 0 {
+               c.transOperationTimeout = rc.TransOperationTimeout
+       }
+       if rc.ServiceInvokeTimeout > 0 {
+               c.serviceInvokeTimeout = rc.ServiceInvokeTimeout
+       }
+       if rc.Charset != "" {
+               c.charset = rc.Charset
+       }
+       if rc.DefaultTenantId != "" {
+               c.defaultTenantId = rc.DefaultTenantId
+       }
+       c.sagaRetryPersistModeUpdate = rc.SagaRetryPersistModeUpdate
+       c.sagaCompensatePersistModeUpdate = rc.SagaCompensatePersistModeUpdate
+       c.sagaBranchRegisterEnable = rc.SagaBranchRegisterEnable
+       c.rmReportSuccessEnable = rc.RmReportSuccessEnable
+       if len(rc.StateMachineResources) > 0 {
+               c.stateMachineResources = rc.StateMachineResources
+       }
+}
+
+func (c *DefaultStateMachineConfig) registerEventConsumers() error {
+       if c.processController == nil {
+               return fmt.Errorf("ProcessController is not initialized")
+       }
+
+       pcImpl, ok := c.processController.(*ProcessControllerImpl)
+       if !ok {
+               return fmt.Errorf("ProcessController is not an instance of 
ProcessControllerImpl")
+       }
+
+       if pcImpl.businessProcessor == nil {
+               return fmt.Errorf("BusinessProcessor in ProcessController is 
not initialized")
+       }
+
+       processCtrlConsumer := &ProcessCtrlEventConsumer{
+               processController: c.processController,
+       }
+
+       c.syncEventBus.RegisterEventConsumer(processCtrlConsumer)
+       c.asyncEventBus.RegisterEventConsumer(processCtrlConsumer)
+
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) Init() error {
+       if err := c.initExpressionComponents(); err != nil {
+               return fmt.Errorf("initialize expression components failed: 
%w", err)
+       }
+
+       if err := c.initServiceInvokers(); err != nil {
+               return fmt.Errorf("initialize service invokers failed: %w", err)
+       }
+
+       if err := c.registerEventConsumers(); err != nil {
+               return fmt.Errorf("register event consumers failed: %w", err)
+       }
+
+       if c.stateMachineRepository != nil && len(c.stateMachineResources) > 0 {
+               if err := c.RegisterStateMachineDef(c.stateMachineResources); 
err != nil {
+                       return fmt.Errorf("register state machine def failed: 
%w", err)
+               }
+       }
+
+       if err := c.Validate(); err != nil {
+               return fmt.Errorf("configuration validation failed: %w", err)
+       }
+
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) initExpressionComponents() error {
+
+       if c.expressionFactoryManager == nil {
+               c.expressionFactoryManager = expr.NewExpressionFactoryManager()
+       }
+
+       defaultType := expr.DefaultExpressionType
+       if defaultType == "" {
+               defaultType = "Default"
+       }
+
+       if factory := 
c.expressionFactoryManager.GetExpressionFactory(defaultType); factory == nil {
+               c.RegisterExpressionFactory(defaultType, 
expr.NewCELExpressionFactory())
+       }
+
+       if factory := c.expressionFactoryManager.GetExpressionFactory("CEL"); 
factory == nil {
+               c.RegisterExpressionFactory("CEL", 
expr.NewCELExpressionFactory())
+       }
+
+       if factory := c.expressionFactoryManager.GetExpressionFactory("el"); 
factory == nil {
+               c.RegisterExpressionFactory("el", 
expr.NewCELExpressionFactory())
+       }
+
+       if c.seqGenerator != nil {
+               sequenceFactory := NewSequenceExpressionFactory(c.seqGenerator)
+               c.RegisterExpressionFactory("SEQUENCE", sequenceFactory)
+               c.RegisterExpressionFactory("SEQ", sequenceFactory)
+       }
+
+       if c.expressionResolver == nil {
+               resolver := &expr.DefaultExpressionResolver{}
+               
resolver.SetExpressionFactoryManager(*c.expressionFactoryManager)
+               c.expressionResolver = resolver
+       }
+
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) initServiceInvokers() error {
+       if c.serviceInvokerManager == nil {
+               c.serviceInvokerManager = invoker.NewServiceInvokerManagerImpl()
+       }
+
+       defaultServiceType := "local"
+       if existingInvoker := 
c.serviceInvokerManager.ServiceInvoker(defaultServiceType); existingInvoker == 
nil {
+               c.RegisterServiceInvoker(defaultServiceType, 
invoker.NewLocalServiceInvoker())
+       }
+
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) Validate() error {
+       var errs []error
+
+       if c.expressionFactoryManager == nil {
+               errs = append(errs, fmt.Errorf("expression factory manager is 
nil"))
+       }
+
+       if c.expressionResolver == nil {
+               errs = append(errs, fmt.Errorf("expression resolver is nil"))
+       }
+
+       if c.serviceInvokerManager == nil {
+               errs = append(errs, fmt.Errorf("service invoker manager is 
nil"))
+       }
+
+       if c.transOperationTimeout <= 0 {
+               errs = append(errs, fmt.Errorf("invalid trans operation 
timeout: %d", c.transOperationTimeout))
+       }
+
+       if c.serviceInvokeTimeout <= 0 {
+               errs = append(errs, fmt.Errorf("invalid service invoke timeout: 
%d", c.serviceInvokeTimeout))
+       }
+
+       if c.charset == "" {
+               errs = append(errs, fmt.Errorf("charset is empty"))
+       }
+
+       if c.stateLogStore == nil {
+               errs = append(errs, fmt.Errorf("state log store is nil"))
+       }
+
+       if c.stateLangStore == nil {
+               errs = append(errs, fmt.Errorf("state lang store is nil"))
+       }
+
+       if c.statusDecisionStrategy == nil {
+               errs = append(errs, fmt.Errorf("status decision strategy is 
nil"))
+       }
+
+       if c.syncEventBus == nil {
+               errs = append(errs, fmt.Errorf("sync event bus is nil"))
+       }
+       if c.asyncEventBus == nil {
+               errs = append(errs, fmt.Errorf("async event bus is nil"))
+       }
+
+       if c.stateLogRepository == nil {
+               errs = append(errs, fmt.Errorf("state log repository is nil"))
+       }
+
+       if len(errs) > 0 {
+               return fmt.Errorf("configuration validation failed with %d 
errors: %v", len(errs), errs)
+       }
+
+       return nil
+}
+
+func (c *DefaultStateMachineConfig) EvaluateExpression(expressionStr string, 
context any) (any, error) {
+       if c.expressionResolver == nil {
+               return nil, fmt.Errorf("expression resolver not initialized")
+       }
+
+       expression := c.expressionResolver.Expression(expressionStr)
+       if expression == nil {
+               return nil, fmt.Errorf("failed to parse expression: %s", 
expressionStr)
+       }
+
+       var result any
+       var evalErr error
+
+       func() {
+               defer func() {
+                       if r := recover(); r != nil {
+                               evalErr = fmt.Errorf("expression evaluation 
panicked: %v", r)
+                       }
+               }()
+
+               result = expression.Value(context)
+       }()
+
+       if evalErr != nil {
+               return nil, evalErr
+       }
+
+       if err, ok := result.(error); ok {
+               return nil, fmt.Errorf("expression evaluation returned error: 
%w", err)
+       }
+
+       return result, nil
+}
+
+func NewDefaultBusinessProcessor() *DefaultBusinessProcessor {
+       return &DefaultBusinessProcessor{
+               processHandlers: make(map[string]ProcessHandler),
+               routerHandlers:  make(map[string]RouterHandler),
+       }
+}
+
+func NewDefaultStateMachineConfig(opts ...Option) *DefaultStateMachineConfig {
+       ctx := context.Background()
+       defaultBP := NewDefaultBusinessProcessor()
+
        c := &DefaultStateMachineConfig{
                transOperationTimeout:           DefaultTransOperTimeout,
                serviceInvokeTimeout:            DefaultServiceInvokeTimeout,
                charset:                         "UTF-8",
                defaultTenantId:                 "000001",
+               stateMachineResources:           
[]string{"classpath*:seata/saga/statelang/**/*.json"},

Review Comment:
   Why is there a Java-style expression like “classpath: xxx”?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to