tew-axiom opened a new issue, #1034:
URL: https://github.com/apache/incubator-seata-go/issues/1034
### 讨论详情
## Issue 1: GlobalTransactionManager 全局单例导致测试困难
### 问题位置
**文件**: `pkg/tm/global_transaction.go`
### 问题代码
```go
var (
// globalTransactionManager singleton ResourceManagerFacade
globalTransactionManager *GlobalTransactionManager
onceGlobalTransactionManager = &sync.Once{}
)
func GetGlobalTransactionManager() *GlobalTransactionManager {
if globalTransactionManager == nil {
onceGlobalTransactionManager.Do(func() {
globalTransactionManager = &GlobalTransactionManager{}
})
}
return globalTransactionManager
}
type GlobalTransactionManager struct{}
```
### 问题分析
1. **单元测试隔离困难**
- 全局单例在测试中无法为每个测试用例创建独立实例
- 测试之间会共享状态,导致测试相互影响
- 无法并行运行测试,因为所有测试共享同一个实例
2. **Mock 困难**
- `GlobalTransactionManager` 直接依赖 `getty.GetGettyRemotingClient()`(第 57 行)
- 测试时无法注入 Mock 的远程客户端
- 必须启动真实的网络连接才能测试
3. **并发安全隐患**
- 虽然使用了 `sync.Once` 保证初始化安全
- 但 `GlobalTransactionManager` 本身是空结构体,所有方法都依赖外部全局状态
- 在高并发场景下,多个 goroutine 同时调用可能产生竞态条件
### 影响范围
- `pkg/tm/global_transaction.go:52-194` - 所有事务管理方法
- 所有调用 `GetGlobalTransactionManager()` 的代码
- 事务管理的单元测试
### 重构必要性
**理由**:
1. **核心功能**: GlobalTransactionManager 是事务管理的核心组件,质量要求极高
2. **测试覆盖**: 当前缺少完整的单元测试,重构后可以补充测试
3. **可维护性**: 全局单例使得代码难以理解和维护
4. **扩展性**: 未来可能需要支持多租户或多集群,全局单例无法满足需求
### 重构建议
```go
// 定义接口
type TransactionManager interface {
Begin(ctx context.Context, timeout time.Duration) error
Commit(ctx context.Context, gtr *GlobalTransaction) error
Rollback(ctx context.Context, gtr *GlobalTransaction) error
GlobalReport(ctx context.Context, gtr *GlobalTransaction)
(message.GlobalStatus, error)
}
// 实现类依赖注入
type DefaultTransactionManager struct {
remotingClient RemotingClient // 注入依赖
config *TmConfig
}
func NewTransactionManager(client RemotingClient, config *TmConfig)
TransactionManager {
return &DefaultTransactionManager{
remotingClient: client,
config: config,
}
}
// 在应用初始化时创建实例
func InitApp() {
remotingClient := getty.NewRemotingClient(...)
tmConfig := loadConfig()
tm := NewTransactionManager(remotingClient, tmConfig)
// 通过 Context 或结构体字段传递
}
```
---
## Issue 2: GettyRemotingClient 全局单例阻碍网络层测试
### 问题位置
**文件**: `pkg/remoting/getty/getty_client.go`
### 问题代码
```go
var (
gettyRemotingClient *GettyRemotingClient
onceGettyRemotingClient = &sync.Once{}
)
type GettyRemotingClient struct {
idGenerator *atomic.Uint32
gettyRemoting *GettyRemoting
}
func GetGettyRemotingClient() *GettyRemotingClient {
if gettyRemotingClient == nil {
onceGettyRemotingClient.Do(func() {
gettyRemotingClient = &GettyRemotingClient{
idGenerator: &atomic.Uint32{},
gettyRemoting: newGettyRemoting(),
}
})
}
return gettyRemotingClient
}
```
### 问题分析
1. **网络测试困难**
- 无法在测试中使用 Mock 的网络连接
- 测试必须依赖真实的 Getty 网络库
- 无法模拟网络故障、超时等异常场景
2. **ID 生成器状态污染**
- `idGenerator` 是全局共享的,测试之间会相互影响
- 无法重置 ID 生成器状态
- 并发测试时 ID 可能冲突
3. **资源泄漏风险**
- 全局单例在程序生命周期内一直存在
- 无法主动释放网络连接资源
- 测试结束后资源无法清理
### 影响范围
- `pkg/tm/global_transaction.go:57,102,144,183` - 事务管理器调用
- `pkg/rm/rm_remoting.go:58,80,104,127` - 资源管理器调用
- 所有需要与 TC 通信的代码
### 重构必要性
**理由**:
1. **网络层核心**: 所有与 TC 的通信都依赖此组件
2. **测试覆盖**: 当前网络层测试几乎为空
3. **故障模拟**: 无法测试网络故障场景
4. **性能测试**: 无法进行压力测试和性能优化
### 重构建议
```go
// 定义接口
type RemotingClient interface {
SendAsyncRequest(msg interface{}) error
SendSyncRequest(msg interface{}) (interface{}, error)
SendAsyncResponse(msgID int32, msg interface{}) error
}
// 实现类
type GettyRemotingClient struct {
idGenerator *atomic.Uint32
gettyRemoting *GettyRemoting
}
func NewGettyRemotingClient(config *Config) RemotingClient {
return &GettyRemotingClient{
idGenerator: &atomic.Uint32{},
gettyRemoting: newGettyRemoting(config),
}
}
// Mock 实现用于测试
type MockRemotingClient struct {
responses map[int32]interface{}
errors map[int32]error
}
func (m *MockRemotingClient) SendSyncRequest(msg interface{}) (interface{},
error) {
// 返回预设的响应或错误
}
```
---
## Issue 3: CodecManager 全局单例限制编解码器扩展
### 问题位置
**文件**: `pkg/protocol/codec/codec.go`
### 问题代码
```go
var (
codecManager *CodecManager
onceCodecManager = &sync.Once{}
)
func GetCodecManager() *CodecManager {
if codecManager == nil {
onceCodecManager.Do(func() {
codecManager = &CodecManager{
codecMap: make(map[CodecType]map[message.MessageType]Codec,
0),
}
})
}
return codecManager
}
type CodecManager struct {
mutex sync.Mutex
codecMap map[CodecType]map[message.MessageType]Codec
}
```
### 问题分析
1. **编解码器注册时机问题**
- 所有编解码器在 `Init()` 函数中注册(第 115-147 行)
- `Init()` 必须在使用前调用,否则会找不到编解码器
- 初始化顺序依赖不明确,容易出错
2. **扩展性差**
- 用户无法注册自定义编解码器
- 无法在运行时动态添加或替换编解码器
- 测试时无法使用简化的编解码器
3. **并发性能问题**
- 使用 `sync.Mutex` 保护 `codecMap`
- 每次编解码都需要获取锁,影响性能
- 实际上 `codecMap` 在初始化后是只读的,不需要锁
### 影响范围
- `pkg/protocol/codec/codec.go:84-113` - 编解码方法
- `pkg/remoting/getty/readwriter.go` - 网络读写
- 所有消息的序列化和反序列化
### 重构必要性
**理由**:
1. **性能影响**: 编解码是高频操作,锁竞争影响性能
2. **扩展性**: 无法支持自定义协议和编解码器
3. **测试困难**: 无法在测试中使用简化的编解码器
4. **初始化复杂**: 依赖全局 Init() 函数,容易遗漏
### 重构建议
```go
// 使用 sync.Map 替代 mutex + map
type CodecManager struct {
codecMap sync.Map // CodecType -> map[MessageType]Codec
}
func NewCodecManager() *CodecManager {
cm := &CodecManager{}
// 注册默认编解码器
cm.RegisterDefaultCodecs()
return cm
}
func (c *CodecManager) RegisterCodec(codecType CodecType, codec Codec) {
typeMap, _ := c.codecMap.LoadOrStore(codecType, &sync.Map{})
typeMap.(*sync.Map).Store(codec.GetMessageType(), codec)
}
func (c *CodecManager) GetCodec(codecType CodecType, msgType
message.MessageType) Codec {
if typeMap, ok := c.codecMap.Load(codecType); ok {
if codec, ok := typeMap.(*sync.Map).Load(msgType); ok {
return codec.(Codec)
}
}
return nil
}
```
---
## Issue 4: ResourceManagerCache 全局单例导致资源管理混乱
### 问题位置
**文件**: `pkg/rm/rm_cache.go`
### 问题代码
```go
var (
// singletone ResourceManagerCache
rmCacheInstance *ResourceManagerCache
onceRMFacade = &sync.Once{}
)
func GetRmCacheInstance() *ResourceManagerCache {
if rmCacheInstance == nil {
onceRMFacade.Do(func() {
rmCacheInstance = &ResourceManagerCache{}
})
}
return rmCacheInstance
}
type ResourceManagerCache struct {
// BranchType -> ResourceManagerCache
resourceManagerMap sync.Map
}
```
### 问题分析
1. **Panic 风险**
- `GetResourceManager` 方法在找不到 ResourceManager 时直接 panic(第 54 行)
- 没有提供优雅的错误处理机制
- 在生产环境中可能导致程序崩溃
2. **注册时机不明确**
- ResourceManager 的注册分散在各个模块的初始化函数中
- 注册顺序依赖不清晰
- 容易出现使用时尚未注册的情况
3. **测试隔离问题**
- 全局缓存在测试之间共享
- 无法为不同测试注册不同的 ResourceManager
- 测试相互影响
### 影响范围
- `pkg/rm/tcc/tcc_service.go:60` - TCC 资源管理器注册
- `pkg/datasource/sql/datasource/datasource_manager.go:46` - 数据源管理器获取
- 所有分支事务的提交和回滚操作
### 重构必要性
**理由**:
1. **稳定性**: Panic 风险严重影响系统稳定性
2. **可测试性**: 无法进行有效的单元测试
3. **可维护性**: 注册逻辑分散,难以追踪
4. **扩展性**: 无法支持动态注册和卸载
### 重构建议
```go
type ResourceManagerRegistry interface {
Register(branchType branch.BranchType, rm ResourceManager) error
Get(branchType branch.BranchType) (ResourceManager, error)
Unregister(branchType branch.BranchType) error
}
type DefaultResourceManagerRegistry struct {
managers sync.Map
}
func NewResourceManagerRegistry() ResourceManagerRegistry {
return &DefaultResourceManagerRegistry{}
}
func (r *DefaultResourceManagerRegistry) Register(branchType
branch.BranchType, rm ResourceManager) error {
if rm == nil {
return errors.New("resource manager cannot be nil")
}
r.managers.Store(branchType, rm)
return nil
}
func (r *DefaultResourceManagerRegistry) Get(branchType branch.BranchType)
(ResourceManager, error) {
if rm, ok := r.managers.Load(branchType); ok {
return rm.(ResourceManager), nil
}
return nil, fmt.Errorf("no resource manager for branch type: %v",
branchType)
}
```
---
## Issue 5: RMRemoting 全局单例阻碍 RM 通信测试
### 问题位置
**文件**: `pkg/rm/rm_remoting.go`
### 问题代码
```go
var (
rmRemoting *RMRemoting
onceGettyRemoting = &sync.Once{}
)
func GetRMRemotingInstance() *RMRemoting {
if rmRemoting == nil {
onceGettyRemoting.Do(func() {
rmRemoting = &RMRemoting{}
})
}
return rmRemoting
}
type RMRemoting struct{}
```
### 问题分析
1. **依赖硬编码**
- `RMRemoting` 的所有方法都直接调用 `getty.GetGettyRemotingClient()`
- 无法注入 Mock 的网络客户端
- 测试必须依赖真实的网络连接
2. **配置硬编码**
- 使用全局变量 `rmConfig`(第 122-123 行)
- 无法为不同的 RM 实例使用不同的配置
- 测试时无法使用测试配置
3. **空结构体设计问题**
- `RMRemoting` 是空结构体,没有任何字段
- 所有方法都是无状态的,完全可以改为包级函数
- 使用单例模式没有意义
### 影响范围
- `pkg/rm/tcc/tcc_service.go:109` - TCC 分支注册
- `pkg/datasource/sql/exec/at/` - AT 模式分支注册
- 所有分支事务的注册、报告、锁查询操作
### 重构必要性
**理由**:
1. **测试覆盖**: RM 通信是核心功能,必须有完整测试
2. **故障模拟**: 需要测试网络故障、超时等场景
3. **性能测试**: 需要进行压力测试和性能优化
4. **代码质量**: 空结构体单例是反模式
### 重构建议
```go
type RMRemoting interface {
BranchRegister(ctx context.Context, param BranchRegisterParam) (int64,
error)
BranchReport(ctx context.Context, param BranchReportParam) error
LockQuery(ctx context.Context, param LockQueryParam) (bool, error)
RegisterResource(resource Resource) error
}
type DefaultRMRemoting struct {
remotingClient RemotingClient
config *RmConfig
}
func NewRMRemoting(client RemotingClient, config *RmConfig) RMRemoting {
return &DefaultRMRemoting{
remotingClient: client,
config: config,
}
}
func (r *DefaultRMRemoting) BranchRegister(ctx context.Context, param
BranchRegisterParam) (int64, error) {
request := message.BranchRegisterRequest{
Xid: param.Xid,
LockKey: param.LockKeys,
ResourceId: param.ResourceId,
BranchType: param.BranchType,
ApplicationData: []byte(param.ApplicationData),
}
resp, err := r.remotingClient.SendSyncRequest(request)
// ... 处理响应
}
```
---
## Issue 6: SessionManager 全局单例限制连接管理灵活性
### 问题位置
**文件**: `pkg/remoting/getty/session_manager.go`
### 问题代码
```go
var (
sessionManager *SessionManager
onceSessionManager = &sync.Once{}
)
type SessionManager struct {
// serverAddress -> rpc_client.Session -> bool
serverSessions sync.Map
allSessions sync.Map
sessionSize int32
gettyConf *config.Config
}
func initSessionManager(gettyConfig *config.Config) {
if sessionManager == nil {
onceSessionManager.Do(func() {
sessionManager = &SessionManager{
allSessions: sync.Map{},
serverSessions: sync.Map{},
gettyConf: gettyConfig,
}
sessionManager.init()
})
}
}
```
### 问题分析
1. **多集群支持困难**
- 全局只有一个 SessionManager
- 无法同时连接多个 Seata 集群
- 无法实现多租户隔离
2. **连接池管理混乱**
- Session 的生命周期管理不清晰
- 没有连接池大小限制
- 没有连接健康检查机制
3. **测试困难**
- 无法在测试中创建独立的 SessionManager
- 无法模拟连接失败、断线重连等场景
- 测试之间会共享连接
### 影响范围
- `pkg/remoting/getty/getty_remoting.go:53,67` - 远程调用
- `pkg/remoting/getty/listener.go` - 事件监听
- 所有与 TC 的网络通信
### 重构必要性
**理由**:
1. **多集群支持**: 企业环境可能需要连接多个集群
2. **连接管理**: 需要更精细的连接池管理
3. **测试覆盖**: 网络层测试必不可少
4. **故障恢复**: 需要更好的连接恢复机制
### 重构建议
```go
type SessionManager interface {
GetSession(msg interface{}) (getty.Session, error)
RegisterSession(session getty.Session)
ReleaseSession(session getty.Session)
Close() error
}
type DefaultSessionManager struct {
serverSessions sync.Map
allSessions sync.Map
sessionSize atomic.Int32
config *config.Config
loadBalancer loadbalance.LoadBalancer
}
func NewSessionManager(config *config.Config, lb loadbalance.LoadBalancer)
SessionManager {
sm := &DefaultSessionManager{
config: config,
loadBalancer: lb,
}
sm.init()
return sm
}
// 支持多集群
type MultiClusterSessionManager struct {
clusters map[string]SessionManager
mu sync.RWMutex
}
func (m *MultiClusterSessionManager) GetSessionManager(cluster string)
(SessionManager, error) {
m.mu.RLock()
defer m.mu.RUnlock()
if sm, ok := m.clusters[cluster]; ok {
return sm, nil
}
return nil, fmt.Errorf("cluster not found: %s", cluster)
}
```
---
## Issue 7: UndoLogParserCache 全局单例限制序列化扩展
### 问题位置
**文件**: `pkg/datasource/sql/undo/parser/parser_cache.go`
### 问题代码
```go
var (
once sync.Once
cache *UndoLogParserCache
)
type UndoLogParserCache struct {
// serializerName --> UndoLogParser
serializerNameToParser map[string]UndoLogParser
}
func initCache() {
cache = &UndoLogParserCache{
serializerNameToParser: make(map[string]UndoLogParser, 0),
}
cache.store(&JsonParser{})
cache.store(&ProtobufParser{})
}
func GetCache() *UndoLogParserCache {
once.Do(initCache)
return cache
}
```
### 问题分析
1. **扩展性差**
- 只支持 JSON 和 Protobuf 两种序列化方式
- 用户无法注册自定义序列化器
- 无法在运行时动态添加序列化器
2. **并发安全问题**
- `serializerNameToParser` 是普通 map,不是并发安全的
- 虽然初始化后是只读的,但没有明确保证
- 如果未来支持动态注册,会有并发问题
3. **测试困难**
- 无法在测试中使用简化的序列化器
- 无法测试序列化失败的场景
- 测试之间共享序列化器
### 影响范围
- `pkg/datasource/sql/undo/undo.go` - Undo 日志序列化
- `pkg/datasource/sql/undo/mysql/` - MySQL Undo 日志处理
- AT 模式的所有回滚操作
### 重构必要性
**理由**:
1. **扩展性**: 用户可能需要自定义序列化格式
2. **性能优化**: 不同场景可能需要不同的序列化器
3. **测试覆盖**: 需要测试序列化失败场景
4. **并发安全**: 需要明确的并发安全保证
### 重构建议
```go
type UndoLogParserRegistry interface {
Register(name string, parser UndoLogParser) error
Get(name string) (UndoLogParser, error)
GetDefault() (UndoLogParser, error)
}
type DefaultUndoLogParserRegistry struct {
parsers sync.Map // string -> UndoLogParser
defaultParser string
}
func NewUndoLogParserRegistry() UndoLogParserRegistry {
registry := &DefaultUndoLogParserRegistry{
defaultParser: "json",
}
// 注册默认解析器
registry.Register("json", &JsonParser{})
registry.Register("protobuf", &ProtobufParser{})
return registry
}
func (r *DefaultUndoLogParserRegistry) Register(name string, parser
UndoLogParser) error {
if parser == nil {
return errors.New("parser cannot be nil")
}
r.parsers.Store(name, parser)
return nil
}
func (r *DefaultUndoLogParserRegistry) Get(name string) (UndoLogParser,
error) {
if parser, ok := r.parsers.Load(name); ok {
return parser.(UndoLogParser), nil
}
return nil, fmt.Errorf("parser not found: %s", name)
}
```
### 📚 相关背景
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]