This is an automated email from the ASF dual-hosted git repository. xjlgod pushed a commit to branch feature/saga in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/feature/saga by this push: new 0b467cb1 Decouple the transaction from the statelog (#841) 0b467cb1 is described below commit 0b467cb183fc1f84ec5f68aafcc0ee10856d174e Author: lxfeng1997 <33981743+lxfeng1...@users.noreply.github.com> AuthorDate: Sun Jun 22 15:13:41 2025 +0800 Decouple the transaction from the statelog (#841) * Decouple the transaction from the statelog * Decouple the transaction from the statelog * Decouple the transaction from the statelog --- pkg/saga/statemachine/store/db/statelog.go | 89 ++++++----- pkg/saga/tm/default_saga_transactional_template.go | 176 +++++++++++++++++++++ pkg/saga/tm/saga_transactional_template.go | 44 ++++++ pkg/tm/transaction_hook.go | 34 ++++ pkg/tm/transaction_hook_manager.go | 60 +++++++ 5 files changed, 365 insertions(+), 38 deletions(-) diff --git a/pkg/saga/statemachine/store/db/statelog.go b/pkg/saga/statemachine/store/db/statelog.go index ccf70798..4d594387 100644 --- a/pkg/saga/statemachine/store/db/statelog.go +++ b/pkg/saga/statemachine/store/db/statelog.go @@ -21,10 +21,6 @@ import ( "context" "database/sql" "fmt" - "github.com/seata/seata-go/pkg/saga/statemachine/engine" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/config" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/pcext" - "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl" "regexp" "strconv" "strings" @@ -34,12 +30,16 @@ import ( constant2 "github.com/seata/seata-go/pkg/constant" "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" - "github.com/seata/seata-go/pkg/rm" "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/engine" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/config" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/pcext" "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence" "github.com/seata/seata-go/pkg/saga/statemachine/engine/serializer" + "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl" "github.com/seata/seata-go/pkg/saga/statemachine/statelang" "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + sagaTm "github.com/seata/seata-go/pkg/saga/tm" "github.com/seata/seata-go/pkg/tm" "github.com/seata/seata-go/pkg/util/log" ) @@ -83,6 +83,8 @@ type StateLogStore struct { updateStateExecutionStatusSql string queryStateInstancesByMachineInstanceIdSql string getStateInstanceByIdAndMachineInstanceIdSql string + + sagaTransactionalTemplate sagaTm.SagaTransactionalTemplate } func NewStateLogStore(db *sql.DB, tablePrefix string) *StateLogStore { @@ -165,6 +167,10 @@ func (s *StateLogStore) RecordStateMachineStarted(ctx context.Context, machineIn } func (s *StateLogStore) beginTransaction(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error { + if s.sagaTransactionalTemplate == nil { + log.Debugf("begin transaction fail, sagaTransactionalTemplate is not existence") + return nil + } cfg, ok := context.GetVariable(constant.VarNameStateMachineConfig).(engine.StateMachineConfig) if !ok { return errors.New("begin transaction fail, stateMachineConfig is required in context") @@ -177,16 +183,20 @@ func (s *StateLogStore) beginTransaction(ctx context.Context, machineInstance st } }() - tm.SetTxRole(ctx, tm.Launcher) - tm.SetTxStatus(ctx, message.GlobalStatusUnKnown) - tm.SetTxName(ctx, constant.SagaTransNamePrefix+machineInstance.StateMachine().Name()) - - err := tm.GetGlobalTransactionManager().Begin(ctx, time.Duration(cfg.TransOperationTimeout())) + txName := constant.SagaTransNamePrefix + machineInstance.StateMachine().Name() + gtx, err := s.sagaTransactionalTemplate.BeginTransaction(ctx, time.Duration(cfg.GetTransOperationTimeout()), txName) if err != nil { return err } + xid := gtx.Xid + machineInstance.SetID(xid) + + context.SetVariable(constant.VarNameGlobalTx, gtx) - machineInstance.SetID(tm.GetXID(ctx)) + machineContext := machineInstance.Context() + if machineContext != nil { + machineContext[constant.VarNameGlobalTx] = gtx + } return nil } @@ -238,7 +248,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, machineI return errors.New("stateMachineConfig is required in context") } - if pcext.IsTimeout(machineInstance.UpdatedTime(), cfg.TransOperationTimeout()) { + if pcext.IsTimeout(machineInstance.UpdatedTime(), cfg.GetTransOperationTimeout()) { log.Warnf("StateMachineInstance[%s] is execution timeout, skip report transaction finished to server.", machineInstance.ID()) } else if machineInstance.ParentID() == "" { //if parentId is not null, machineInstance is a SubStateMachine, do not report global transaction. @@ -260,7 +270,11 @@ func (s *StateLogStore) reportTransactionFinished(ctx context.Context, machineIn } }() - globalTransaction, err := s.getGlobalTransaction(machineInstance, context) + if s.sagaTransactionalTemplate == nil { + log.Debugf("report transaction finished fail, sagaTransactionalTemplate is not existence") + return nil + } + globalTransaction, err := s.getGlobalTransaction(ctx, machineInstance, context) if err != nil { log.Errorf("Failed to get global transaction: %v", err) return err @@ -282,14 +296,14 @@ func (s *StateLogStore) reportTransactionFinished(ctx context.Context, machineIn } globalTransaction.TxStatus = globalStatus - _, err = tm.GetGlobalTransactionManager().GlobalReport(ctx, globalTransaction) + err = s.sagaTransactionalTemplate.ReportTransaction(ctx, globalTransaction) if err != nil { return err } return nil } -func (s *StateLogStore) getGlobalTransaction(machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) (*tm.GlobalTransaction, error) { +func (s *StateLogStore) getGlobalTransaction(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) (*tm.GlobalTransaction, error) { globalTransaction, ok := context.GetVariable(constant.VarNameGlobalTx).(*tm.GlobalTransaction) if ok { return globalTransaction, nil @@ -302,12 +316,10 @@ func (s *StateLogStore) getGlobalTransaction(machineInstance statelang.StateMach } else { xid = parentId[:strings.LastIndex(parentId, constant.SeperatorParentId)] } - globalTransaction = &tm.GlobalTransaction{ - Xid: xid, - TxStatus: message.GlobalStatusUnKnown, - TxRole: tm.Launcher, + globalTransaction, err := s.sagaTransactionalTemplate.ReloadTransaction(ctx, xid) + if err != nil { + return nil, err } - context.SetVariable(constant.VarNameGlobalTx, globalTransaction) return globalTransaction, nil } @@ -354,7 +366,7 @@ func (s *StateLogStore) RecordStateStarted(ctx context.Context, stateInstance st stateInstance.SetID(s.generateCompensateStateInstanceId(stateInstance, isUpdateMode)) } else { // register branch - s.branchRegister(stateInstance, context) + s.branchRegister(ctx, stateInstance, context) } if stateInstance.ID() == "" && s.seqGenerator != nil { @@ -461,7 +473,7 @@ func (s *StateLogStore) generateCompensateStateInstanceId(stateInstance statelan return fmt.Sprintf("%s-%d", originalCompensateStateInstId, maxIndex) } -func (s *StateLogStore) branchRegister(stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error { +func (s *StateLogStore) branchRegister(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error { cfg, ok := context.GetVariable(constant.VarNameStateMachineConfig).(config.DefaultStateMachineConfig) if !ok { return errors.New("stateMachineConfig is required in context") @@ -482,7 +494,7 @@ func (s *StateLogStore) branchRegister(stateInstance statelang.StateInstance, co } }() - globalTransaction, err := s.getGlobalTransaction(machineInstance, context) + globalTransaction, err := s.getGlobalTransaction(ctx, machineInstance, context) if err != nil { return err } @@ -491,11 +503,8 @@ func (s *StateLogStore) branchRegister(stateInstance statelang.StateInstance, co return err } - branchId, err := rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{ - BranchType: branch.BranchTypeSAGA, - ResourceId: machineInstance.StateMachine().Name() + "#" + stateInstance.Name(), - Xid: globalTransaction.Xid, - }) + resourceId := machineInstance.StateMachine().Name() + "#" + stateInstance.Name() + branchId, err := s.sagaTransactionalTemplate.BranchRegister(ctx, resourceId, "", globalTransaction.Xid, "", "") if err != nil { return err } @@ -546,7 +555,7 @@ func (s *StateLogStore) RecordStateFinished(ctx context.Context, stateInstance s // A switch to skip branch report on branch success, in order to optimize performance cfg, ok := context.GetVariable(constant.VarNameStateMachineConfig).(config.DefaultStateMachineConfig) if !(ok && !cfg.IsRmReportSuccessEnable() && statelang.SU == stateInstance.Status()) { - err = s.branchReport(stateInstance, context) + err = s.branchReport(ctx, stateInstance, context) return err } @@ -554,7 +563,7 @@ func (s *StateLogStore) RecordStateFinished(ctx context.Context, stateInstance s } -func (s *StateLogStore) branchReport(stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error { +func (s *StateLogStore) branchReport(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error { cfg, ok := context.GetVariable(constant.VarNameStateMachineConfig).(config.DefaultStateMachineConfig) if ok && !cfg.IsSagaBranchRegisterEnable() { log.Debugf("sagaBranchRegisterEnable = false, skip branch report. state[%s]", stateInstance.Name()) @@ -624,7 +633,7 @@ func (s *StateLogStore) branchReport(stateInstance statelang.StateInstance, cont } }() - globalTransaction, err := s.getGlobalTransaction(stateInstance.StateMachineInstance(), context) + globalTransaction, err := s.getGlobalTransaction(ctx, stateInstance.StateMachineInstance(), context) if err != nil { return err } @@ -634,13 +643,10 @@ func (s *StateLogStore) branchReport(stateInstance statelang.StateInstance, cont } branchId, err := strconv.ParseInt(originalStateInst.ID(), 10, 0) - err = rm.GetRMRemotingInstance().BranchReport(rm.BranchReportParam{ - BranchType: branch.BranchTypeSAGA, - Xid: globalTransaction.Xid, - BranchId: branchId, - Status: branchStatus, - }) - return err + if err != nil { + return err + } + return s.sagaTransactionalTemplate.BranchReport(ctx, globalTransaction.Xid, branchId, branchStatus, "") } func (s *StateLogStore) findOutOriginalStateInstanceOfRetryState(stateInstance statelang.StateInstance) statelang.StateInstance { @@ -851,6 +857,13 @@ func (s *StateLogStore) ClearUp(context process_ctrl.ProcessContext) { context.RemoveVariable(constant2.BranchTypeKey) } +func (s *StateLogStore) SetSagaTransactionalTemplate(sagaTransactionalTemplate sagaTm.SagaTransactionalTemplate) { + s.sagaTransactionalTemplate = sagaTransactionalTemplate +} +func (s *StateLogStore) GetSagaTransactionalTemplate() sagaTm.SagaTransactionalTemplate { + return s.sagaTransactionalTemplate +} + func execStateMachineInstanceStatementForInsert(obj statelang.StateMachineInstance, stmt *sql.Stmt) (int64, error) { result, err := stmt.Exec( obj.ID(), diff --git a/pkg/saga/tm/default_saga_transactional_template.go b/pkg/saga/tm/default_saga_transactional_template.go new file mode 100644 index 00000000..d9bf1751 --- /dev/null +++ b/pkg/saga/tm/default_saga_transactional_template.go @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tm + +import ( + "context" + "github.com/seata/seata-go/pkg/rm" + "time" + + "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/tm" + "github.com/seata/seata-go/pkg/util/log" +) + +type DefaultSagaTransactionalTemplate struct { + applicationId string + txServiceGroup string +} + +func (t *DefaultSagaTransactionalTemplate) CommitTransaction(ctx context.Context, gtr *tm.GlobalTransaction) error { + t.triggerBeforeCommit() + err := tm.GetGlobalTransactionManager().Commit(ctx, gtr) + if err != nil { + return err + } + t.triggerAfterCommit() + return nil +} + +func (t *DefaultSagaTransactionalTemplate) RollbackTransaction(ctx context.Context, gtr *tm.GlobalTransaction) error { + t.triggerBeforeRollback() + err := tm.GetGlobalTransactionManager().Rollback(ctx, gtr) + if err != nil { + return err + } + t.triggerAfterRollback() + return nil +} + +func (t *DefaultSagaTransactionalTemplate) BeginTransaction(ctx context.Context, timeout time.Duration, txName string) (*tm.GlobalTransaction, error) { + t.triggerBeforeBegin() + tm.SetTxName(ctx, txName) + err := tm.GetGlobalTransactionManager().Begin(ctx, timeout) + if err != nil { + return nil, err + } + t.triggerAfterBegin() + return tm.GetTx(ctx), nil +} + +func (t *DefaultSagaTransactionalTemplate) ReloadTransaction(ctx context.Context, xid string) (*tm.GlobalTransaction, error) { + return &tm.GlobalTransaction{ + Xid: xid, + TxStatus: message.GlobalStatusUnKnown, + TxRole: tm.Launcher, + }, nil +} + +func (t *DefaultSagaTransactionalTemplate) ReportTransaction(ctx context.Context, gtr *tm.GlobalTransaction) error { + _, err := tm.GetGlobalTransactionManager().GlobalReport(ctx, gtr) + if err != nil { + return err + } + t.triggerAfterCommit() + return nil +} + +func (t *DefaultSagaTransactionalTemplate) BranchRegister(ctx context.Context, resourceId string, clientId string, xid string, applicationData string, lockKeys string) (int64, error) { + //todo Wait implement sagaResource + return rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{ + BranchType: branch.BranchTypeSAGA, + ResourceId: resourceId, + Xid: xid, + ClientId: clientId, + ApplicationData: applicationData, + LockKeys: lockKeys, + }) +} + +func (t *DefaultSagaTransactionalTemplate) BranchReport(ctx context.Context, xid string, branchId int64, status branch.BranchStatus, applicationData string) error { + //todo Wait implement sagaResource + return rm.GetRMRemotingInstance().BranchReport(rm.BranchReportParam{ + BranchType: branch.BranchTypeSAGA, + Xid: xid, + BranchId: branchId, + Status: status, + ApplicationData: applicationData, + }) +} + +func (t *DefaultSagaTransactionalTemplate) CleanUp(ctx context.Context) { + tm.GetTransactionalHookManager().Clear() +} + +func (t *DefaultSagaTransactionalTemplate) getCurrentHooks() []tm.TransactionalHook { + return tm.GetTransactionalHookManager().GetHooks() +} + +func (t *DefaultSagaTransactionalTemplate) triggerBeforeBegin() { + for _, hook := range t.getCurrentHooks() { + err := hook.BeforeBegin() + if nil != err { + log.Error(err) + } + } +} + +func (t *DefaultSagaTransactionalTemplate) triggerAfterBegin() { + for _, hook := range t.getCurrentHooks() { + err := hook.AfterBegin() + if nil != err { + log.Error(err) + } + } +} + +func (t *DefaultSagaTransactionalTemplate) triggerBeforeRollback() { + for _, hook := range t.getCurrentHooks() { + err := hook.BeforeRollback() + if nil != err { + log.Error(err) + } + } +} + +func (t *DefaultSagaTransactionalTemplate) triggerAfterRollback() { + for _, hook := range t.getCurrentHooks() { + err := hook.AfterRollback() + if nil != err { + log.Error(err) + } + } +} + +func (t *DefaultSagaTransactionalTemplate) triggerBeforeCommit() { + for _, hook := range t.getCurrentHooks() { + err := hook.BeforeCommit() + if nil != err { + log.Error(err) + } + } +} + +func (t *DefaultSagaTransactionalTemplate) triggerAfterCommit() { + for _, hook := range t.getCurrentHooks() { + err := hook.AfterCommit() + if nil != err { + log.Error(err) + } + } +} + +func (t *DefaultSagaTransactionalTemplate) triggerAfterCompletion() { + for _, hook := range t.getCurrentHooks() { + err := hook.AfterCompletion() + if nil != err { + log.Error(err) + } + } +} diff --git a/pkg/saga/tm/saga_transactional_template.go b/pkg/saga/tm/saga_transactional_template.go new file mode 100644 index 00000000..fd0ae60d --- /dev/null +++ b/pkg/saga/tm/saga_transactional_template.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tm + +import ( + "context" + "time" + + "github.com/seata/seata-go/pkg/protocol/branch" + "github.com/seata/seata-go/pkg/tm" +) + +type SagaTransactionalTemplate interface { + CommitTransaction(ctx context.Context, gtr *tm.GlobalTransaction) error + + RollbackTransaction(ctx context.Context, gtr *tm.GlobalTransaction) error + + BeginTransaction(ctx context.Context, timeout time.Duration, txName string) (*tm.GlobalTransaction, error) + + ReloadTransaction(ctx context.Context, xid string) (*tm.GlobalTransaction, error) + + ReportTransaction(ctx context.Context, gtr *tm.GlobalTransaction) error + + BranchRegister(ctx context.Context, resourceId string, clientId string, xid string, applicationData string, lockKeys string) (int64, error) + + BranchReport(ctx context.Context, xid string, branchId int64, status branch.BranchStatus, applicationData string) error + + CleanUp(ctx context.Context) +} diff --git a/pkg/tm/transaction_hook.go b/pkg/tm/transaction_hook.go new file mode 100644 index 00000000..42a36cec --- /dev/null +++ b/pkg/tm/transaction_hook.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tm + +type TransactionalHook interface { + BeforeBegin() error + + AfterBegin() error + + BeforeCommit() error + + AfterCommit() error + + BeforeRollback() error + + AfterRollback() error + + AfterCompletion() error +} diff --git a/pkg/tm/transaction_hook_manager.go b/pkg/tm/transaction_hook_manager.go new file mode 100644 index 00000000..9d5b8261 --- /dev/null +++ b/pkg/tm/transaction_hook_manager.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package tm + +import ( + "github.com/pkg/errors" + "sync" +) + +var ( + transactionalHookManager *TransactionalHookManager + onceTransactionalHookManager sync.Once +) + +type TransactionalHookManager struct { + transactionalHook []TransactionalHook +} + +func GetTransactionalHookManager() *TransactionalHookManager { + if transactionalHookManager == nil { + onceTransactionalHookManager.Do(func() { + transactionalHookManager = &TransactionalHookManager{ + transactionalHook: make([]TransactionalHook, 0), + } + }) + } + + return transactionalHookManager +} + +func (h *TransactionalHookManager) GetHooks() []TransactionalHook { + return h.transactionalHook +} + +func (h *TransactionalHookManager) RegisterHook(hook TransactionalHook) error { + if nil == hook { + return errors.New("transactionHook must not be null") + } + h.transactionalHook = append(h.transactionalHook, hook) + return nil +} + +func (h *TransactionalHookManager) Clear() { + h.transactionalHook = h.transactionalHook[0:0] +} --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org