This is an automated email from the ASF dual-hosted git repository.

zfeng pushed a commit to branch feature/saga
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/feature/saga by this push:
     new dc0a7a14 Implement SagaResource and SagaResourceManager (#855)
dc0a7a14 is described below

commit dc0a7a1410b3b8900e963a47db677c15103067c7
Author: Wiggins <125641755+minat...@users.noreply.github.com>
AuthorDate: Sat Aug 30 14:56:51 2025 +0800

    Implement SagaResource and SagaResourceManager (#855)
    
    * feat : impl SagaResource #843
    
    * feat : impl SagaResource #843
    
    * feat : impl SagaResourceManager #843
    
    * feat : impl SagaResourceManager BranchRollback #843
    
    * feat : impl handler_saga #843
    
    * feat : impl invalid_exception_test #843
    
    * update #843
    
    * update:elegant code #843
    
    * abstracting tm branch registration out to rm and add test #846
    
    * test #846
    
    * add licensed  #846
    
    * replace panic to return #843
    
    * update  #843
    
    * update init and handler_saga  #843
    
    ---------
    
    Co-authored-by: FengZhang <zfc...@qq.com>
---
 pkg/client/client.go                               |   2 +
 pkg/saga/rm/handler_saga.go                        |  35 +++++
 pkg/saga/rm/saga_resource.go                       |  52 +++++++
 pkg/saga/rm/saga_resource_manager.go               | 159 +++++++++++++++++++++
 pkg/saga/rm/saga_resource_manager_test.go          |  43 ++++++
 pkg/saga/rm/state_machine_engine_holder.go         |  38 +++++
 .../statemachine/engine/exception/exception.go     |  12 +-
 .../engine/exception/exception_test.go             |  68 +++++++++
 .../engine/exception/forward_invalid_exception.go  |  32 +++++
 .../exception/forward_invalid_exception_test.go    |  68 +++++++++
 pkg/saga/tm/default_saga_transactional_template.go |  12 +-
 11 files changed, 510 insertions(+), 11 deletions(-)

diff --git a/pkg/client/client.go b/pkg/client/client.go
index 54a56888..aafdaa08 100644
--- a/pkg/client/client.go
+++ b/pkg/client/client.go
@@ -30,6 +30,7 @@ import (
        "github.com/seata/seata-go/pkg/remoting/processor/client"
        "github.com/seata/seata-go/pkg/rm"
        "github.com/seata/seata-go/pkg/rm/tcc"
+       saga "github.com/seata/seata-go/pkg/saga/rm"
        "github.com/seata/seata-go/pkg/tm"
        "github.com/seata/seata-go/pkg/util/log"
 )
@@ -87,6 +88,7 @@ func initRmClient(cfg *Config) {
                client.RegisterProcessor()
                integration.Init()
                tcc.InitTCC()
+               saga.InitSaga()
                at.InitAT(cfg.ClientConfig.UndoConfig, cfg.AsyncWorkerConfig)
                at.InitXA(cfg.ClientConfig.XaConfig)
        })
diff --git a/pkg/saga/rm/handler_saga.go b/pkg/saga/rm/handler_saga.go
new file mode 100644
index 00000000..7bb87ff4
--- /dev/null
+++ b/pkg/saga/rm/handler_saga.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rm
+
+import (
+       "github.com/seata/seata-go/pkg/protocol/branch"
+       "github.com/seata/seata-go/pkg/rm"
+)
+
+type RMHandlerSaga struct{}
+
+func (h *RMHandlerSaga) HandleUndoLogDeleteRequest(request interface{}) {
+       // do nothing
+}
+func (h *RMHandlerSaga) GetResourceManager() rm.ResourceManager {
+       return rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeSAGA)
+}
+func (h *RMHandlerSaga) GetBranchType() branch.BranchType {
+       return branch.BranchTypeSAGA
+}
diff --git a/pkg/saga/rm/saga_resource.go b/pkg/saga/rm/saga_resource.go
new file mode 100644
index 00000000..2b145a4d
--- /dev/null
+++ b/pkg/saga/rm/saga_resource.go
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rm
+
+import (
+       "fmt"
+       "github.com/seata/seata-go/pkg/protocol/branch"
+)
+
+type SagaResource struct {
+       resourceGroupId string
+       applicationId   string
+}
+
+func (r *SagaResource) GetResourceGroupId() string {
+       return r.resourceGroupId
+}
+
+func (r *SagaResource) SetResourceGroupId(resourceGroupId string) {
+       r.resourceGroupId = resourceGroupId
+}
+
+func (r *SagaResource) GetResourceId() string {
+       return fmt.Sprintf("%s#%s", r.applicationId, r.resourceGroupId)
+}
+
+func (r *SagaResource) GetBranchType() branch.BranchType {
+       return branch.BranchTypeSAGA
+}
+
+func (r *SagaResource) GetApplicationId() string {
+       return r.applicationId
+}
+
+func (r *SagaResource) SetApplicationId(applicationId string) {
+       r.applicationId = applicationId
+}
diff --git a/pkg/saga/rm/saga_resource_manager.go 
b/pkg/saga/rm/saga_resource_manager.go
new file mode 100644
index 00000000..55f08b75
--- /dev/null
+++ b/pkg/saga/rm/saga_resource_manager.go
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rm
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "log"
+       "sync"
+
+       "github.com/seata/seata-go/pkg/protocol/branch"
+       "github.com/seata/seata-go/pkg/protocol/message"
+       "github.com/seata/seata-go/pkg/rm"
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
+       "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
+       seataErrors "github.com/seata/seata-go/pkg/util/errors"
+)
+
+var (
+       sagaResourceManagerInstance *SagaResourceManager
+       once                        sync.Once
+)
+
+type SagaResourceManager struct {
+       rmRemoting    *rm.RMRemoting
+       resourceCache sync.Map
+}
+
+func InitSaga() {
+       
rm.GetRmCacheInstance().RegisterResourceManager(GetSagaResourceManager())
+}
+
+func GetSagaResourceManager() *SagaResourceManager {
+       once.Do(func() {
+               sagaResourceManagerInstance = &SagaResourceManager{
+                       rmRemoting:    rm.GetRMRemotingInstance(),
+                       resourceCache: sync.Map{},
+               }
+       })
+       return sagaResourceManagerInstance
+}
+
+func (s *SagaResourceManager) RegisterResource(resource rm.Resource) error {
+       if _, ok := resource.(*SagaResource); !ok {
+               return fmt.Errorf("register saga resource error, SagaResource 
is needed, param %v", resource)
+       }
+       s.resourceCache.Store(resource.GetResourceId(), resource)
+       return s.rmRemoting.RegisterResource(resource)
+
+}
+
+func (s *SagaResourceManager) GetCachedResources() *sync.Map {
+       return &s.resourceCache
+}
+
+func (s *SagaResourceManager) GetBranchType() branch.BranchType {
+       return branch.BranchTypeSAGA
+}
+
+func (s *SagaResourceManager) BranchCommit(ctx context.Context, resource 
rm.BranchResource) (branch.BranchStatus, error) {
+       engine := GetStateMachineEngine()
+       stMaInst, err := engine.Forward(ctx, resource.Xid, nil)
+       if err != nil {
+               if fie, ok := exception.IsForwardInvalidException(err); ok {
+                       log.Printf("StateMachine forward failed, xid: %s, err: 
%v", resource.Xid, err)
+                       if isInstanceNotExists(fie.ErrCode) {
+                               return branch.BranchStatusPhasetwoCommitted, nil
+                       }
+               }
+               log.Printf("StateMachine forward failed, xid: %s, err: %v", 
resource.Xid, err)
+               return branch.BranchStatusPhasetwoCommitFailedRetryable, err
+       }
+
+       status := stMaInst.Status()
+       compStatus := stMaInst.CompensationStatus()
+
+       switch {
+       case status == statelang.SU && compStatus == "":
+               return branch.BranchStatusPhasetwoCommitted, nil
+       case compStatus == statelang.SU:
+               return branch.BranchStatusPhasetwoRollbacked, nil
+       case compStatus == statelang.FA || compStatus == statelang.UN:
+               return branch.BranchStatusPhasetwoRollbackFailedRetryable, nil
+       case status == statelang.FA && compStatus == "":
+               return branch.BranchStatusPhaseoneFailed, nil
+       default:
+               return branch.BranchStatusPhasetwoCommitFailedRetryable, nil
+       }
+}
+
+func (s *SagaResourceManager) BranchRollback(ctx context.Context, resource 
rm.BranchResource) (branch.BranchStatus, error) {
+       engine := GetStateMachineEngine()
+       stMaInst, err := engine.ReloadStateMachineInstance(ctx, resource.Xid)
+       if err != nil || stMaInst == nil {
+               return branch.BranchStatusPhasetwoRollbacked, nil
+       }
+
+       strategy := stMaInst.StateMachine().RecoverStrategy()
+       appData := resource.ApplicationData
+       isTimeoutRollback := bytes.Equal(appData, 
[]byte{byte(message.GlobalStatusTimeoutRollbacking)}) || bytes.Equal(appData, 
[]byte{byte(message.GlobalStatusTimeoutRollbackRetrying)})
+
+       if strategy == statelang.Forward && isTimeoutRollback {
+               log.Printf("Retry by custom recover strategy [Forward] on 
timeout, SAGA global[%s]", resource.Xid)
+               return branch.BranchStatusPhasetwoCommitFailedRetryable, nil
+       }
+
+       stMaInst, err = engine.Compensate(ctx, resource.Xid, nil)
+       if err == nil && stMaInst.CompensationStatus() == statelang.SU {
+               return branch.BranchStatusPhasetwoRollbacked, nil
+       }
+
+       if fie, ok := exception.IsEngineExecutionException(err); ok {
+               log.Printf("StateMachine compensate failed, xid: %s, err: %v", 
resource.Xid, err)
+               if isInstanceNotExists(fie.ErrCode) {
+                       return branch.BranchStatusPhasetwoRollbacked, nil
+               }
+       }
+       log.Printf("StateMachine compensate failed, xid: %s, err: %v", 
resource.Xid, err)
+       return branch.BranchStatusPhasetwoRollbackFailedRetryable, err
+}
+
+func (s *SagaResourceManager) BranchRegister(ctx context.Context, param 
rm.BranchRegisterParam) (int64, error) {
+       return s.rmRemoting.BranchRegister(param)
+}
+
+func (s *SagaResourceManager) BranchReport(ctx context.Context, param 
rm.BranchReportParam) error {
+       return s.rmRemoting.BranchReport(param)
+}
+
+func (s *SagaResourceManager) LockQuery(ctx context.Context, param 
rm.LockQueryParam) (bool, error) {
+       // LockQuery is not supported for Saga resources
+       return false, fmt.Errorf("LockQuery is not supported for Saga 
resources")
+}
+
+func (s *SagaResourceManager) UnregisterResource(resource rm.Resource) error {
+       // UnregisterResource is not supported for SagaResourceManager
+       return fmt.Errorf("UnregisterResource is not supported for 
SagaResourceManager")
+}
+
+// isInstanceNotExists checks if the error code indicates 
StateMachineInstanceNotExists
+func isInstanceNotExists(errCode string) bool {
+       return errCode == fmt.Sprintf("%v", 
seataErrors.StateMachineInstanceNotExists)
+}
diff --git a/pkg/saga/rm/saga_resource_manager_test.go 
b/pkg/saga/rm/saga_resource_manager_test.go
new file mode 100644
index 00000000..61af679f
--- /dev/null
+++ b/pkg/saga/rm/saga_resource_manager_test.go
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rm
+
+import (
+       "sync"
+       "testing"
+)
+
+func TestGetSagaResourceManager_Singleton(t *testing.T) {
+       var wg sync.WaitGroup
+       instances := make([]*SagaResourceManager, 10)
+       for i := 0; i < 10; i++ {
+               wg.Add(1)
+               go func(idx int) {
+                       defer wg.Done()
+                       instances[idx] = GetSagaResourceManager()
+               }(i)
+       }
+       wg.Wait()
+
+       first := instances[0]
+       for i, inst := range instances {
+               if inst != first {
+                       t.Errorf("Instance at index %d is not the same as the 
first instance", i)
+               }
+       }
+}
diff --git a/pkg/saga/rm/state_machine_engine_holder.go 
b/pkg/saga/rm/state_machine_engine_holder.go
new file mode 100644
index 00000000..6aaa4ee0
--- /dev/null
+++ b/pkg/saga/rm/state_machine_engine_holder.go
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rm
+
+import (
+       "github.com/seata/seata-go/pkg/saga/statemachine/engine"
+       "sync"
+)
+
+var (
+       stateMachineEngine     engine.StateMachineEngine
+       stateMachineEngineOnce sync.Once
+)
+
+func GetStateMachineEngine() engine.StateMachineEngine {
+       return stateMachineEngine
+}
+
+func SetStateMachineEngine(smEngine engine.StateMachineEngine) {
+       stateMachineEngineOnce.Do(func() {
+               stateMachineEngine = smEngine
+       })
+}
diff --git a/pkg/saga/statemachine/engine/exception/exception.go 
b/pkg/saga/statemachine/engine/exception/exception.go
index 708bd6cd..7c2e475e 100644
--- a/pkg/saga/statemachine/engine/exception/exception.go
+++ b/pkg/saga/statemachine/engine/exception/exception.go
@@ -18,6 +18,7 @@
 package exception
 
 import (
+       perror "errors"
        "fmt"
        "github.com/seata/seata-go/pkg/util/errors"
 )
@@ -41,6 +42,13 @@ func NewEngineExecutionException(code 
errors.TransactionErrorCode, msg string, p
                SeataError: *seataError,
        }
 }
+func IsEngineExecutionException(err error) (*EngineExecutionException, bool) {
+       var fie *EngineExecutionException
+       if perror.As(err, &fie) {
+               return fie, true
+       }
+       return nil, false
+}
 
 func (e *EngineExecutionException) StateName() string {
        return e.stateName
@@ -73,7 +81,3 @@ func (e *EngineExecutionException) StateInstanceId() string {
 func (e *EngineExecutionException) SetStateInstanceId(stateInstanceId string) {
        e.stateInstanceId = stateInstanceId
 }
-
-type ForwardInvalidException struct {
-       EngineExecutionException
-}
diff --git a/pkg/saga/statemachine/engine/exception/exception_test.go 
b/pkg/saga/statemachine/engine/exception/exception_test.go
new file mode 100644
index 00000000..e7bdee84
--- /dev/null
+++ b/pkg/saga/statemachine/engine/exception/exception_test.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package exception
+
+import (
+       "errors"
+       "testing"
+
+       pkgerr "github.com/seata/seata-go/pkg/util/errors"
+)
+
+func TestIsEngineExecutionException(t *testing.T) {
+       cases := []struct {
+               name    string
+               err     error
+               wantOk  bool
+               wantMsg string
+       }{
+               {
+                       name:    "EngineExecutionException",
+                       err:     &EngineExecutionException{SeataError: 
pkgerr.SeataError{Message: "engine error"}},
+                       wantOk:  true,
+                       wantMsg: "engine error",
+               },
+               {
+                       name:    "Other error",
+                       err:     errors.New("some other error"),
+                       wantOk:  false,
+                       wantMsg: "",
+               },
+               {
+                       name:    "nil error",
+                       err:     nil,
+                       wantOk:  false,
+                       wantMsg: "",
+               },
+       }
+
+       for _, c := range cases {
+               t.Run(c.name, func(t *testing.T) {
+                       fie, ok := IsEngineExecutionException(c.err)
+                       if ok != c.wantOk {
+                               t.Errorf("expected ok=%v, got %v", c.wantOk, ok)
+                       }
+                       if ok && fie.SeataError.Message != c.wantMsg {
+                               t.Errorf("expected Message=%q, got %q", 
c.wantMsg, fie.SeataError.Message)
+                       }
+                       if !ok && fie != nil {
+                               t.Errorf("expected fie=nil, got %v", fie)
+                       }
+               })
+       }
+}
diff --git 
a/pkg/saga/statemachine/engine/exception/forward_invalid_exception.go 
b/pkg/saga/statemachine/engine/exception/forward_invalid_exception.go
new file mode 100644
index 00000000..ac79903e
--- /dev/null
+++ b/pkg/saga/statemachine/engine/exception/forward_invalid_exception.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package exception
+
+import "errors"
+
+type ForwardInvalidException struct {
+       EngineExecutionException
+}
+
+func IsForwardInvalidException(err error) (*ForwardInvalidException, bool) {
+       var fie *ForwardInvalidException
+       if errors.As(err, &fie) {
+               return fie, true
+       }
+       return nil, false
+}
diff --git 
a/pkg/saga/statemachine/engine/exception/forward_invalid_exception_test.go 
b/pkg/saga/statemachine/engine/exception/forward_invalid_exception_test.go
new file mode 100644
index 00000000..93779a98
--- /dev/null
+++ b/pkg/saga/statemachine/engine/exception/forward_invalid_exception_test.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package exception
+
+import (
+       "errors"
+       "testing"
+
+       pkgerr "github.com/seata/seata-go/pkg/util/errors"
+)
+
+func TestIsForwardInvalidException(t *testing.T) {
+       cases := []struct {
+               name    string
+               err     error
+               wantOk  bool
+               wantMsg string
+       }{
+               {
+                       name:    "ForwardInvalidException",
+                       err:     
&ForwardInvalidException{EngineExecutionException: 
EngineExecutionException{SeataError: pkgerr.SeataError{Message: "forward 
invalid"}}},
+                       wantOk:  true,
+                       wantMsg: "forward invalid",
+               },
+               {
+                       name:    "Other error",
+                       err:     errors.New("some other error"),
+                       wantOk:  false,
+                       wantMsg: "",
+               },
+               {
+                       name:    "nil error",
+                       err:     nil,
+                       wantOk:  false,
+                       wantMsg: "",
+               },
+       }
+
+       for _, c := range cases {
+               t.Run(c.name, func(t *testing.T) {
+                       fie, ok := IsForwardInvalidException(c.err)
+                       if ok != c.wantOk {
+                               t.Errorf("expected ok=%v, got %v", c.wantOk, ok)
+                       }
+                       if ok && fie.SeataError.Message != c.wantMsg {
+                               t.Errorf("expected Message=%q, got %q", 
c.wantMsg, fie.SeataError.Message)
+                       }
+                       if !ok && fie != nil {
+                               t.Errorf("expected fie=nil, got %v", fie)
+                       }
+               })
+       }
+}
diff --git a/pkg/saga/tm/default_saga_transactional_template.go 
b/pkg/saga/tm/default_saga_transactional_template.go
index d9bf1751..dbf30f1c 100644
--- a/pkg/saga/tm/default_saga_transactional_template.go
+++ b/pkg/saga/tm/default_saga_transactional_template.go
@@ -19,13 +19,13 @@ package tm
 
 import (
        "context"
-       "github.com/seata/seata-go/pkg/rm"
-       "time"
-
        "github.com/seata/seata-go/pkg/protocol/branch"
        "github.com/seata/seata-go/pkg/protocol/message"
+       "github.com/seata/seata-go/pkg/rm"
+       sagarm "github.com/seata/seata-go/pkg/saga/rm"
        "github.com/seata/seata-go/pkg/tm"
        "github.com/seata/seata-go/pkg/util/log"
+       "time"
 )
 
 type DefaultSagaTransactionalTemplate struct {
@@ -82,8 +82,7 @@ func (t *DefaultSagaTransactionalTemplate) 
ReportTransaction(ctx context.Context
 }
 
 func (t *DefaultSagaTransactionalTemplate) BranchRegister(ctx context.Context, 
resourceId string, clientId string, xid string, applicationData string, 
lockKeys string) (int64, error) {
-       //todo Wait implement sagaResource
-       return rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{
+       return sagarm.GetSagaResourceManager().BranchRegister(ctx, 
rm.BranchRegisterParam{
                BranchType:      branch.BranchTypeSAGA,
                ResourceId:      resourceId,
                Xid:             xid,
@@ -94,8 +93,7 @@ func (t *DefaultSagaTransactionalTemplate) BranchRegister(ctx 
context.Context, r
 }
 
 func (t *DefaultSagaTransactionalTemplate) BranchReport(ctx context.Context, 
xid string, branchId int64, status branch.BranchStatus, applicationData string) 
error {
-       //todo Wait implement sagaResource
-       return rm.GetRMRemotingInstance().BranchReport(rm.BranchReportParam{
+       return sagarm.GetSagaResourceManager().BranchReport(ctx, 
rm.BranchReportParam{
                BranchType:      branch.BranchTypeSAGA,
                Xid:             xid,
                BranchId:        branchId,


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to