This is an automated email from the ASF dual-hosted git repository. zfeng pushed a commit to branch feature/saga in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push: new dc0a7a14 Implement SagaResource and SagaResourceManager (#855) dc0a7a14 is described below commit dc0a7a1410b3b8900e963a47db677c15103067c7 Author: Wiggins <125641755+minat...@users.noreply.github.com> AuthorDate: Sat Aug 30 14:56:51 2025 +0800 Implement SagaResource and SagaResourceManager (#855) * feat : impl SagaResource #843 * feat : impl SagaResource #843 * feat : impl SagaResourceManager #843 * feat : impl SagaResourceManager BranchRollback #843 * feat : impl handler_saga #843 * feat : impl invalid_exception_test #843 * update #843 * update:elegant code #843 * abstracting tm branch registration out to rm and add test #846 * test #846 * add licensed #846 * replace panic to return #843 * update #843 * update init and handler_saga #843 --------- Co-authored-by: FengZhang <zfc...@qq.com> --- pkg/client/client.go | 2 + pkg/saga/rm/handler_saga.go | 35 +++++ pkg/saga/rm/saga_resource.go | 52 +++++++ pkg/saga/rm/saga_resource_manager.go | 159 +++++++++++++++++++++ pkg/saga/rm/saga_resource_manager_test.go | 43 ++++++ pkg/saga/rm/state_machine_engine_holder.go | 38 +++++ .../statemachine/engine/exception/exception.go | 12 +- .../engine/exception/exception_test.go | 68 +++++++++ .../engine/exception/forward_invalid_exception.go | 32 +++++ .../exception/forward_invalid_exception_test.go | 68 +++++++++ pkg/saga/tm/default_saga_transactional_template.go | 12 +- 11 files changed, 510 insertions(+), 11 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index 54a56888..aafdaa08 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -30,6 +30,7 @@ import ( "github.com/seata/seata-go/pkg/remoting/processor/client" "github.com/seata/seata-go/pkg/rm" "github.com/seata/seata-go/pkg/rm/tcc" + saga "github.com/seata/seata-go/pkg/saga/rm" "github.com/seata/seata-go/pkg/tm" "github.com/seata/seata-go/pkg/util/log" ) @@ -87,6 +88,7 @@ func initRmClient(cfg *Config) { client.RegisterProcessor() integration.Init() tcc.InitTCC() + saga.InitSaga() at.InitAT(cfg.ClientConfig.UndoConfig, cfg.AsyncWorkerConfig) at.InitXA(cfg.ClientConfig.XaConfig) }) diff --git a/pkg/saga/rm/handler_saga.go b/pkg/saga/rm/handler_saga.go new file mode 100644 index 00000000..7bb87ff4 --- /dev/null +++ b/pkg/saga/rm/handler_saga.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rm + +import ( + "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/rm" +) + +type RMHandlerSaga struct{} + +func (h *RMHandlerSaga) HandleUndoLogDeleteRequest(request interface{}) { + // do nothing +} +func (h *RMHandlerSaga) GetResourceManager() rm.ResourceManager { + return rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeSAGA) +} +func (h *RMHandlerSaga) GetBranchType() branch.BranchType { + return branch.BranchTypeSAGA +} diff --git a/pkg/saga/rm/saga_resource.go b/pkg/saga/rm/saga_resource.go new file mode 100644 index 00000000..2b145a4d --- /dev/null +++ b/pkg/saga/rm/saga_resource.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rm + +import ( + "fmt" + "github.com/seata/seata-go/pkg/protocol/branch" +) + +type SagaResource struct { + resourceGroupId string + applicationId string +} + +func (r *SagaResource) GetResourceGroupId() string { + return r.resourceGroupId +} + +func (r *SagaResource) SetResourceGroupId(resourceGroupId string) { + r.resourceGroupId = resourceGroupId +} + +func (r *SagaResource) GetResourceId() string { + return fmt.Sprintf("%s#%s", r.applicationId, r.resourceGroupId) +} + +func (r *SagaResource) GetBranchType() branch.BranchType { + return branch.BranchTypeSAGA +} + +func (r *SagaResource) GetApplicationId() string { + return r.applicationId +} + +func (r *SagaResource) SetApplicationId(applicationId string) { + r.applicationId = applicationId +} diff --git a/pkg/saga/rm/saga_resource_manager.go b/pkg/saga/rm/saga_resource_manager.go new file mode 100644 index 00000000..55f08b75 --- /dev/null +++ b/pkg/saga/rm/saga_resource_manager.go @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rm + +import ( + "bytes" + "context" + "fmt" + "log" + "sync" + + "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/rm" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + seataErrors "github.com/seata/seata-go/pkg/util/errors" +) + +var ( + sagaResourceManagerInstance *SagaResourceManager + once sync.Once +) + +type SagaResourceManager struct { + rmRemoting *rm.RMRemoting + resourceCache sync.Map +} + +func InitSaga() { + rm.GetRmCacheInstance().RegisterResourceManager(GetSagaResourceManager()) +} + +func GetSagaResourceManager() *SagaResourceManager { + once.Do(func() { + sagaResourceManagerInstance = &SagaResourceManager{ + rmRemoting: rm.GetRMRemotingInstance(), + resourceCache: sync.Map{}, + } + }) + return sagaResourceManagerInstance +} + +func (s *SagaResourceManager) RegisterResource(resource rm.Resource) error { + if _, ok := resource.(*SagaResource); !ok { + return fmt.Errorf("register saga resource error, SagaResource is needed, param %v", resource) + } + s.resourceCache.Store(resource.GetResourceId(), resource) + return s.rmRemoting.RegisterResource(resource) + +} + +func (s *SagaResourceManager) GetCachedResources() *sync.Map { + return &s.resourceCache +} + +func (s *SagaResourceManager) GetBranchType() branch.BranchType { + return branch.BranchTypeSAGA +} + +func (s *SagaResourceManager) BranchCommit(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) { + engine := GetStateMachineEngine() + stMaInst, err := engine.Forward(ctx, resource.Xid, nil) + if err != nil { + if fie, ok := exception.IsForwardInvalidException(err); ok { + log.Printf("StateMachine forward failed, xid: %s, err: %v", resource.Xid, err) + if isInstanceNotExists(fie.ErrCode) { + return branch.BranchStatusPhasetwoCommitted, nil + } + } + log.Printf("StateMachine forward failed, xid: %s, err: %v", resource.Xid, err) + return branch.BranchStatusPhasetwoCommitFailedRetryable, err + } + + status := stMaInst.Status() + compStatus := stMaInst.CompensationStatus() + + switch { + case status == statelang.SU && compStatus == "": + return branch.BranchStatusPhasetwoCommitted, nil + case compStatus == statelang.SU: + return branch.BranchStatusPhasetwoRollbacked, nil + case compStatus == statelang.FA || compStatus == statelang.UN: + return branch.BranchStatusPhasetwoRollbackFailedRetryable, nil + case status == statelang.FA && compStatus == "": + return branch.BranchStatusPhaseoneFailed, nil + default: + return branch.BranchStatusPhasetwoCommitFailedRetryable, nil + } +} + +func (s *SagaResourceManager) BranchRollback(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) { + engine := GetStateMachineEngine() + stMaInst, err := engine.ReloadStateMachineInstance(ctx, resource.Xid) + if err != nil || stMaInst == nil { + return branch.BranchStatusPhasetwoRollbacked, nil + } + + strategy := stMaInst.StateMachine().RecoverStrategy() + appData := resource.ApplicationData + isTimeoutRollback := bytes.Equal(appData, []byte{byte(message.GlobalStatusTimeoutRollbacking)}) || bytes.Equal(appData, []byte{byte(message.GlobalStatusTimeoutRollbackRetrying)}) + + if strategy == statelang.Forward && isTimeoutRollback { + log.Printf("Retry by custom recover strategy [Forward] on timeout, SAGA global[%s]", resource.Xid) + return branch.BranchStatusPhasetwoCommitFailedRetryable, nil + } + + stMaInst, err = engine.Compensate(ctx, resource.Xid, nil) + if err == nil && stMaInst.CompensationStatus() == statelang.SU { + return branch.BranchStatusPhasetwoRollbacked, nil + } + + if fie, ok := exception.IsEngineExecutionException(err); ok { + log.Printf("StateMachine compensate failed, xid: %s, err: %v", resource.Xid, err) + if isInstanceNotExists(fie.ErrCode) { + return branch.BranchStatusPhasetwoRollbacked, nil + } + } + log.Printf("StateMachine compensate failed, xid: %s, err: %v", resource.Xid, err) + return branch.BranchStatusPhasetwoRollbackFailedRetryable, err +} + +func (s *SagaResourceManager) BranchRegister(ctx context.Context, param rm.BranchRegisterParam) (int64, error) { + return s.rmRemoting.BranchRegister(param) +} + +func (s *SagaResourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error { + return s.rmRemoting.BranchReport(param) +} + +func (s *SagaResourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) { + // LockQuery is not supported for Saga resources + return false, fmt.Errorf("LockQuery is not supported for Saga resources") +} + +func (s *SagaResourceManager) UnregisterResource(resource rm.Resource) error { + // UnregisterResource is not supported for SagaResourceManager + return fmt.Errorf("UnregisterResource is not supported for SagaResourceManager") +} + +// isInstanceNotExists checks if the error code indicates StateMachineInstanceNotExists +func isInstanceNotExists(errCode string) bool { + return errCode == fmt.Sprintf("%v", seataErrors.StateMachineInstanceNotExists) +} diff --git a/pkg/saga/rm/saga_resource_manager_test.go b/pkg/saga/rm/saga_resource_manager_test.go new file mode 100644 index 00000000..61af679f --- /dev/null +++ b/pkg/saga/rm/saga_resource_manager_test.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rm + +import ( + "sync" + "testing" +) + +func TestGetSagaResourceManager_Singleton(t *testing.T) { + var wg sync.WaitGroup + instances := make([]*SagaResourceManager, 10) + for i := 0; i < 10; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + instances[idx] = GetSagaResourceManager() + }(i) + } + wg.Wait() + + first := instances[0] + for i, inst := range instances { + if inst != first { + t.Errorf("Instance at index %d is not the same as the first instance", i) + } + } +} diff --git a/pkg/saga/rm/state_machine_engine_holder.go b/pkg/saga/rm/state_machine_engine_holder.go new file mode 100644 index 00000000..6aaa4ee0 --- /dev/null +++ b/pkg/saga/rm/state_machine_engine_holder.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rm + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/engine" + "sync" +) + +var ( + stateMachineEngine engine.StateMachineEngine + stateMachineEngineOnce sync.Once +) + +func GetStateMachineEngine() engine.StateMachineEngine { + return stateMachineEngine +} + +func SetStateMachineEngine(smEngine engine.StateMachineEngine) { + stateMachineEngineOnce.Do(func() { + stateMachineEngine = smEngine + }) +} diff --git a/pkg/saga/statemachine/engine/exception/exception.go b/pkg/saga/statemachine/engine/exception/exception.go index 708bd6cd..7c2e475e 100644 --- a/pkg/saga/statemachine/engine/exception/exception.go +++ b/pkg/saga/statemachine/engine/exception/exception.go @@ -18,6 +18,7 @@ package exception import ( + perror "errors" "fmt" "github.com/seata/seata-go/pkg/util/errors" ) @@ -41,6 +42,13 @@ func NewEngineExecutionException(code errors.TransactionErrorCode, msg string, p SeataError: *seataError, } } +func IsEngineExecutionException(err error) (*EngineExecutionException, bool) { + var fie *EngineExecutionException + if perror.As(err, &fie) { + return fie, true + } + return nil, false +} func (e *EngineExecutionException) StateName() string { return e.stateName @@ -73,7 +81,3 @@ func (e *EngineExecutionException) StateInstanceId() string { func (e *EngineExecutionException) SetStateInstanceId(stateInstanceId string) { e.stateInstanceId = stateInstanceId } - -type ForwardInvalidException struct { - EngineExecutionException -} diff --git a/pkg/saga/statemachine/engine/exception/exception_test.go b/pkg/saga/statemachine/engine/exception/exception_test.go new file mode 100644 index 00000000..e7bdee84 --- /dev/null +++ b/pkg/saga/statemachine/engine/exception/exception_test.go @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package exception + +import ( + "errors" + "testing" + + pkgerr "github.com/seata/seata-go/pkg/util/errors" +) + +func TestIsEngineExecutionException(t *testing.T) { + cases := []struct { + name string + err error + wantOk bool + wantMsg string + }{ + { + name: "EngineExecutionException", + err: &EngineExecutionException{SeataError: pkgerr.SeataError{Message: "engine error"}}, + wantOk: true, + wantMsg: "engine error", + }, + { + name: "Other error", + err: errors.New("some other error"), + wantOk: false, + wantMsg: "", + }, + { + name: "nil error", + err: nil, + wantOk: false, + wantMsg: "", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + fie, ok := IsEngineExecutionException(c.err) + if ok != c.wantOk { + t.Errorf("expected ok=%v, got %v", c.wantOk, ok) + } + if ok && fie.SeataError.Message != c.wantMsg { + t.Errorf("expected Message=%q, got %q", c.wantMsg, fie.SeataError.Message) + } + if !ok && fie != nil { + t.Errorf("expected fie=nil, got %v", fie) + } + }) + } +} diff --git a/pkg/saga/statemachine/engine/exception/forward_invalid_exception.go b/pkg/saga/statemachine/engine/exception/forward_invalid_exception.go new file mode 100644 index 00000000..ac79903e --- /dev/null +++ b/pkg/saga/statemachine/engine/exception/forward_invalid_exception.go @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package exception + +import "errors" + +type ForwardInvalidException struct { + EngineExecutionException +} + +func IsForwardInvalidException(err error) (*ForwardInvalidException, bool) { + var fie *ForwardInvalidException + if errors.As(err, &fie) { + return fie, true + } + return nil, false +} diff --git a/pkg/saga/statemachine/engine/exception/forward_invalid_exception_test.go b/pkg/saga/statemachine/engine/exception/forward_invalid_exception_test.go new file mode 100644 index 00000000..93779a98 --- /dev/null +++ b/pkg/saga/statemachine/engine/exception/forward_invalid_exception_test.go @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package exception + +import ( + "errors" + "testing" + + pkgerr "github.com/seata/seata-go/pkg/util/errors" +) + +func TestIsForwardInvalidException(t *testing.T) { + cases := []struct { + name string + err error + wantOk bool + wantMsg string + }{ + { + name: "ForwardInvalidException", + err: &ForwardInvalidException{EngineExecutionException: EngineExecutionException{SeataError: pkgerr.SeataError{Message: "forward invalid"}}}, + wantOk: true, + wantMsg: "forward invalid", + }, + { + name: "Other error", + err: errors.New("some other error"), + wantOk: false, + wantMsg: "", + }, + { + name: "nil error", + err: nil, + wantOk: false, + wantMsg: "", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + fie, ok := IsForwardInvalidException(c.err) + if ok != c.wantOk { + t.Errorf("expected ok=%v, got %v", c.wantOk, ok) + } + if ok && fie.SeataError.Message != c.wantMsg { + t.Errorf("expected Message=%q, got %q", c.wantMsg, fie.SeataError.Message) + } + if !ok && fie != nil { + t.Errorf("expected fie=nil, got %v", fie) + } + }) + } +} diff --git a/pkg/saga/tm/default_saga_transactional_template.go b/pkg/saga/tm/default_saga_transactional_template.go index d9bf1751..dbf30f1c 100644 --- a/pkg/saga/tm/default_saga_transactional_template.go +++ b/pkg/saga/tm/default_saga_transactional_template.go @@ -19,13 +19,13 @@ package tm import ( "context" - "github.com/seata/seata-go/pkg/rm" - "time" - "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/rm" + sagarm "github.com/seata/seata-go/pkg/saga/rm" "github.com/seata/seata-go/pkg/tm" "github.com/seata/seata-go/pkg/util/log" + "time" ) type DefaultSagaTransactionalTemplate struct { @@ -82,8 +82,7 @@ func (t *DefaultSagaTransactionalTemplate) ReportTransaction(ctx context.Context } func (t *DefaultSagaTransactionalTemplate) BranchRegister(ctx context.Context, resourceId string, clientId string, xid string, applicationData string, lockKeys string) (int64, error) { - //todo Wait implement sagaResource - return rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{ + return sagarm.GetSagaResourceManager().BranchRegister(ctx, rm.BranchRegisterParam{ BranchType: branch.BranchTypeSAGA, ResourceId: resourceId, Xid: xid, @@ -94,8 +93,7 @@ func (t *DefaultSagaTransactionalTemplate) BranchRegister(ctx context.Context, r } func (t *DefaultSagaTransactionalTemplate) BranchReport(ctx context.Context, xid string, branchId int64, status branch.BranchStatus, applicationData string) error { - //todo Wait implement sagaResource - return rm.GetRMRemotingInstance().BranchReport(rm.BranchReportParam{ + return sagarm.GetSagaResourceManager().BranchReport(ctx, rm.BranchReportParam{ BranchType: branch.BranchTypeSAGA, Xid: xid, BranchId: branchId, --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org