This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch feat/transport in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 24066a0c4e112f5fcdc110bd26873955c84d7354 Author: liuhy <[email protected]> AuthorDate: Tue Sep 9 19:16:15 2025 +0800 docs: Update README for clarity and structure; add Chinese version link --- README-CN.md | 395 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- README.md | 420 +---------------------------------------------------------- 2 files changed, 381 insertions(+), 434 deletions(-) diff --git a/README-CN.md b/README-CN.md index f4ce435..4409464 100644 --- a/README-CN.md +++ b/README-CN.md @@ -2,37 +2,36 @@ [](LICENSE) -HertzBeat-Collector-Go 是 [Apache HertzBeat](https://github.com/apache/hertzbeat) 的 Go 语言采集器实现,支持多协议、多类型的监控数据采集,具备高性能、易扩展、易集成等特性。 +HertzBeat-Collector-Go 是 [Apache HertzBeat](https://github.com/apache/hertzbeat) 的 Go 语言实现的数据采集器。它支持多协议、多类型的监控数据采集,具有高性能、易扩展、无缝集成的特点。 ## ✨ 特性 -- 支持多种协议(如 HTTP、JDBC、SNMP、SSH 等)采集监控数据 -- 任务调度、作业管理、采集策略灵活可扩展 -- 结构清晰,易于二次开发和集成 +- 支持多种协议(HTTP、JDBC、SNMP、SSH 等)的监控数据采集 +- 灵活可扩展的任务调度、作业管理和采集策略 +- 清晰的架构设计,易于二次开发和集成 - 丰富的开发、测试和部署脚本 - 完善的文档和社区支持 -## 📂 目录结构 +## 📂 项目结构 ```text . -├── cmd/ # 主程序入口 -├── internal/ # 采集器核心实现与通用组件 -│ ├── collector/ # 各类采集器 -│ ├── common/ # 公共模块(调度、作业、类型、日志等) +├── cmd/ # 主入口点 +├── internal/ # 核心采集器实现和通用模块 +│ ├── collector/ # 各种采集器 +│ ├── common/ # 通用模块(调度、作业、类型、日志等) │ └── util/ # 工具类 ├── api/ # 协议定义(protobuf) ├── examples/ # 示例代码 -├── docs/ # 架构与开发文档 -├── tools/ # 构建、CI、脚本等工具 +├── docs/ # 架构和开发文档 +├── tools/ # 构建、CI、脚本和工具 ├── Makefile # 构建入口 -├── Dockerfile # 容器化部署 -└── README.md # 项目说明 +└── README-CN.md # 项目说明(中文) ``` ## 🚀 快速开始 -### 1. 构建与运行 +### 1. 构建和运行 ```bash # 安装依赖 @@ -45,14 +44,374 @@ make build ./bin/collector server --config etc/hertzbeat-collector.yaml ``` -### 2. 示例 +### 2. 环境变量配置(Docker 兼容) -可参考 `examples/main_simulation.go` 进行本地模拟采集测试。 +Go 版本完全兼容 Java 版本的环境变量配置: + +```bash +# 设置环境变量 +export IDENTITY=本地 +export MANAGER_HOST=192.168.97.0 +export MODE=public + +# 使用环境变量运行 +go run examples/main.go + +# 或使用 Docker +docker run -d \ + -e IDENTITY=本地 \ + -e MANAGER_HOST=192.168.97.0 \ + -e MODE=public \ + --name hertzbeat-collector-go \ + hertzbeat-collector-go +``` + +### 3. 示例 + +查看 `examples/` 目录获取各种使用示例: +- `examples/main.go` - 使用环境变量的主要示例 +- `examples/README.md` - 完整使用指南 +- `examples/Dockerfile` - Docker 构建示例 + +## 🔄 Java 服务器集成 + +该 Go 采集器设计为与 Java 版本的 HertzBeat 管理服务器兼容。传输层支持 gRPC 和 Netty 协议,实现无缝集成。 + +### 协议支持 + +Go 采集器支持两种通信协议: + +1. **Netty 协议**(推荐用于 Java 服务器兼容性) + - 使用长度前缀的 protobuf 消息格式 + - 与 Java Netty 服务器实现兼容 + - 默认端口:1158 + +2. **gRPC 协议** + - 使用标准 gRPC 和 protobuf + - 支持双向流式通信 + - 默认端口:1159 + +### 配置 + +#### 基础配置 + +```yaml +# etc/hertzbeat-collector.yaml +server: + host: "0.0.0.0" + port: 1158 + +transport: + protocol: "netty" # "netty" 或 "grpc" + server_addr: "127.0.0.1:1158" # Java 管理服务器地址 + timeout: 5000 # 连接超时时间(毫秒) + heartbeat_interval: 10 # 心跳间隔(秒) +``` + +#### 连接 Java 服务器 + +```go +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + "time" + + clrServer "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" + transport2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" + loggerUtil "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + loggerTypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" +) + +func main() { + // 创建日志记录器 + logging := loggerTypes.DefaultHertzbeatLogging() + appLogger := loggerUtil.DefaultLogger(os.Stdout, logging.Level[loggerTypes.LogComponentHertzbeatDefault]) + + // 创建 Java 服务器的传输配置 + config := &transport2.Config{ + Server: clrServer.Server{ + Logger: appLogger, + }, + ServerAddr: "127.0.0.1:1158", // Java 管理服务器地址 + Protocol: "netty", // 使用 netty 协议以实现 Java 兼容性 + } + + // 创建并启动传输运行器 + runner := transport2.New(config) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 在后台启动传输 + go func() { + if err := runner.Start(ctx); err != nil { + appLogger.Error(err, "启动传输失败") + cancel() + } + }() + + // 等待关闭信号 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + appLogger.Info("正在关闭...") + time.Sleep(5 * time.Second) + + if err := runner.Close(); err != nil { + appLogger.Error(err, "关闭传输失败") + } +} +``` + +### 直接客户端使用 + +为了更细粒度的控制,您可以直接使用传输客户端: + +```go +package main + +import ( + "log" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" + pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg" +) + +func main() { + // 创建 Java 服务器的 Netty 客户端 + factory := &transport.TransportClientFactory{} + client, err := factory.CreateClient("netty", "127.0.0.1:1158") + if err != nil { + log.Fatal("创建客户端失败:", err) + } + + // 启动客户端 + if err := client.Start(); err != nil { + log.Fatal("启动客户端失败:", err) + } + defer client.Shutdown() + + // 注册消息处理器 + client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) { + if pbMsg, ok := msg.(*pb.Message); ok { + log.Printf("收到消息: %s", string(pbMsg.Msg)) + return &pb.Message{ + Type: pb.MessageType_HEARTBEAT, + Direction: pb.Direction_RESPONSE, + Identity: pbMsg.Identity, + Msg: []byte("response"), + }, nil + } + return nil, nil + }) + + // 发送心跳消息 + heartbeat := &pb.Message{ + Type: pb.MessageType_HEARTBEAT, + Direction: pb.Direction_REQUEST, + Identity: "go-collector", + Msg: []byte("heartbeat"), + } + + // 异步发送 + if err := client.SendMsg(heartbeat); err != nil { + log.Printf("发送消息失败: %v", err) + } + + // 同步发送,带超时 + resp, err := client.SendMsgSync(heartbeat, 5000) + if err != nil { + log.Printf("发送同步消息失败: %v", err) + } else if resp != nil { + if pbResp, ok := resp.(*pb.Message); ok { + log.Printf("收到响应: %s", string(pbResp.Msg)) + } + } +} +``` + +### 消息类型 + +Go 采集器支持 Java 版本中定义的所有消息类型: + +| 消息类型 | 值 | 描述 | +|----------|-----|------| +| HEARTBEAT | 0 | 心跳/健康检查 | +| GO_ONLINE | 1 | 采集器上线通知 | +| GO_OFFLINE | 2 | 采集器下线通知 | +| GO_CLOSE | 3 | 采集器关闭通知 | +| ISSUE_CYCLIC_TASK | 4 | 发布周期性采集任务 | +| DELETE_CYCLIC_TASK | 5 | 删除周期性采集任务 | +| ISSUE_ONE_TIME_TASK | 6 | 发布一次性采集任务 | + +### 连接管理 + +传输层提供了强大的连接管理功能: + +- **自动重连**:连接丢失时自动尝试重连 +- **连接监控**:后台监控连接健康状态 +- **心跳机制**:定期心跳消息以保持连接 +- **事件处理**:连接状态变更通知(已连接、断开连接、连接失败) + +### 错误处理 + +该实现包含全面的错误处理: + +- **连接超时**:连接尝试的正确超时处理 +- **消息序列化**:Protobuf 编组/解组错误处理 +- **响应关联**:使用身份字段正确匹配请求和响应 +- **优雅关闭**:使用上下文取消的干净关闭程序 + +## 🔍 代码逻辑分析和兼容性 + +### 实现状态 + +Go 采集器实现提供了与 Java 版本的全面兼容性: + +#### ✅ **完全实现的功能** + +1. **传输层兼容性** + - **Netty 协议**:使用长度前缀消息格式的完整实现 + - **gRPC 协议**:完整的 gRPC 服务实现,支持双向流式通信 + - **消息类型**:支持所有核心消息类型(HEARTBEAT、GO_ONLINE、GO_OFFLINE 等) + - **请求/响应模式**:正确处理同步和异步通信 + +2. **连接管理** + - **自动重连**:连接丢失时的强大重连逻辑 + - **连接监控**:带截止时间管理的后台健康检查 + - **事件系统**:连接状态变更的全面事件处理 + - **心跳机制**:用于连接维护的定期心跳消息 + +3. **消息处理** + - **处理器注册**:动态消息处理器注册和分发 + - **响应关联**:使用身份字段正确请求-响应匹配 + - **错误处理**:整个消息管道中的全面错误处理 + - **超时管理**:所有操作的可配置超时 + +4. **协议兼容性** + - **Protobuf 消息**:与 Java protobuf 定义完全兼容 + - **消息序列化**:Netty 协议的正确二进制格式处理 + - **流处理**:支持一元和流式 gRPC 操作 + +#### ⚠️ **改进领域** + +1. **任务处理逻辑** + - 当前实现为任务处理返回占位符响应 + - 需要根据具体要求实现实际采集逻辑 + - 任务调度和执行引擎需要集成 + +2. **配置管理** + - 配置文件格式需要与 Java 版本标准化 + - 环境变量支持可以增强 + - 可以添加动态配置重载 + +3. **监控和指标** + - 可以添加全面的指标收集 + - 可以增强性能监控集成 + - 可以暴露健康检查端点 + +#### 🔧 **技术实现细节** + +1. **Netty 协议实现** + ```go + // Java 兼容的长度前缀消息格式 + func (c *NettyClient) writeMessage(msg *pb.Message) error { + data, err := proto.Marshal(msg) + if err != nil { + return fmt.Errorf("消息编组失败: %w", err) + } + // 写入长度前缀(varint32) + length := len(data) + if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err != nil { + return fmt.Errorf("写入长度失败: %w", err) + } + // 写入消息数据 + if _, err := c.writer.Write(data); err != nil { + return fmt.Errorf("写入消息失败: %w", err) + } + return c.writer.Flush() + } + ``` + +2. **响应未来模式** + ```go + // 使用 ResponseFuture 进行同步通信 + func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface{}, error) { + // 为此请求创建响应未来 + future := NewResponseFuture() + c.responseTable[pbMsg.Identity] = future + defer delete(c.responseTable, pbMsg.Identity) + + // 发送消息 + if err := c.writeMessage(pbMsg); err != nil { + future.PutError(err) + return nil, err + } + + // 等待带超时的响应 + return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) + } + ``` + +3. **事件驱动架构** + ```go + // 连接事件处理 + func (c *NettyClient) triggerEvent(eventType EventType, err error) { + if c.eventHandler != nil { + c.eventHandler(Event{ + Type: eventType, + Address: c.addr, + Error: err, + }) + } + } + ``` + +#### 🎯 **兼容性评估** + +Go 实现实现了与 Java 版本的**高度兼容性**: + +- **协议级别**:100% 兼容 Netty 消息格式 +- **消息类型**:所有核心消息类型都已实现 +- **通信模式**:支持同步和异步模式 +- **连接管理**:具有自动恢复功能的强大连接处理 +- **错误处理**:全面的错误处理 + +#### 📋 **建议** + +1. **生产使用**: + - 根据具体监控要求实现实际任务处理逻辑 + - 添加全面的日志记录和监控 + - 实现配置验证和管理 + - 添加与 Java 服务器的集成测试 + +2. **开发使用**: + - 当前实现提供了坚实的基础 + - 所有核心通信模式都已正确实现 + - 协议兼容性得到了彻底解决 + - 可扩展性已构建到架构中 + +3. **测试策略**: + - 与实际 Java 服务器部署一起测试 + - 验证消息格式兼容性 + - 测试连接恢复场景 + - 验证负载下的性能 + +Go 采集器实现成功地重新创建了 Java 版本的核心通信功能,为 Go 中的 HertzBeat 监控数据采集提供了坚实的基础。 ## 🛠️ 贡献 -欢迎通过 [CONTRIBUTING.md](CONTRIBUTING.md) 参与贡献,包括代码、文档、测试、讨论等。 +欢迎贡献!请查看 [CONTRIBUTING.md](CONTRIBUTING.md) 了解详细信息,包括代码、文档、测试和讨论。 ## 📄 许可证 -本项目基于 [Apache 2.0 License](LICENSE) 开源。 +本项目基于 [Apache 2.0 许可证](LICENSE) 许可。 + +## 🌐 English Version + +For English documentation, please see [README.md](README.md). diff --git a/README.md b/README.md index d7e4575..773115f 100644 --- a/README.md +++ b/README.md @@ -2,422 +2,6 @@ [](LICENSE) -HertzBeat-Collector-Go 是 [Apache HertzBeat](https://github.com/apache/hertzbeat) 的 Go 语言实现的数据采集器。它支持多协议、多类型的监控数据采集,具有高性能、易扩展、无缝集成的特点。 - -## ✨ 特性 - -- 支持多种协议(HTTP、JDBC、SNMP、SSH 等)的监控数据采集 -- 灵活可扩展的任务调度、作业管理和采集策略 -- 清晰的架构设计,易于二次开发和集成 -- 丰富的开发、测试和部署脚本 -- 完善的文档和社区支持 - -## 📂 项目结构 - -```text -. -├── cmd/ # 主入口点 -├── internal/ # 核心采集器实现和通用模块 -│ ├── collector/ # 各种采集器 -│ ├── common/ # 通用模块(调度、作业、类型、日志等) -│ └── util/ # 工具类 -├── api/ # 协议定义(protobuf) -├── examples/ # 示例代码 -├── docs/ # 架构和开发文档 -├── tools/ # 构建、CI、脚本和工具 -├── Makefile # 构建入口 -└── README.md # 项目描述 -``` - -## 🚀 快速开始 - -### 1. 构建和运行 - -```bash -# 安装依赖 -go mod tidy - -# 构建 -make build - -# 运行 -./bin/collector server --config etc/hertzbeat-collector.yaml -``` - -### 2. 环境变量配置(Docker 兼容) - -Go 版本完全兼容 Java 版本的环境变量配置: - -```bash -# 设置环境变量 -export IDENTITY=本地 -export MANAGER_HOST=192.168.97.0 -export MODE=public - -# 使用环境变量运行 -go run examples/main.go - -# 或使用 Docker -docker run -d \ - -e IDENTITY=本地 \ - -e MANAGER_HOST=192.168.97.0 \ - -e MODE=public \ - --name hertzbeat-collector-go \ - hertzbeat-collector-go -``` - -### 3. 示例 - -查看 `examples/` 目录获取各种使用示例: -- `examples/main.go` - 使用环境变量的主要示例 -- `examples/README.md` - 完整使用指南 -- `examples/Dockerfile` - Docker 构建示例 - -## 🔄 Java 服务器集成 - -该 Go 采集器设计为与 Java 版本的 HertzBeat 管理服务器兼容。传输层支持 gRPC 和 Netty 协议,实现无缝集成。 - -### 协议支持 - -Go 采集器支持两种通信协议: - -1. **Netty 协议**(推荐用于 Java 服务器兼容性) - - 使用长度前缀的 protobuf 消息格式 - - 与 Java Netty 服务器实现兼容 - - 默认端口:1158 - -2. **gRPC 协议** - - 使用标准 gRPC 和 protobuf - - 支持双向流式通信 - - 默认端口:1159 - -### 配置 - -#### 基础配置 - -```yaml -# etc/hertzbeat-collector.yaml -server: - host: "0.0.0.0" - port: 1158 - -transport: - protocol: "netty" # "netty" 或 "grpc" - server_addr: "127.0.0.1:1158" # Java 管理服务器地址 - timeout: 5000 # 连接超时时间(毫秒) - heartbeat_interval: 10 # 心跳间隔(秒) -``` - -#### 连接 Java 服务器 - -```go -package main - -import ( - "context" - "log" - "os" - "os/signal" - "syscall" - "time" - - clrServer "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" - transport2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" - loggerUtil "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" - loggerTypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" -) - -func main() { - // 创建日志记录器 - logging := loggerTypes.DefaultHertzbeatLogging() - appLogger := loggerUtil.DefaultLogger(os.Stdout, logging.Level[loggerTypes.LogComponentHertzbeatDefault]) - - // 创建 Java 服务器的传输配置 - config := &transport2.Config{ - Server: clrServer.Server{ - Logger: appLogger, - }, - ServerAddr: "127.0.0.1:1158", // Java 管理服务器地址 - Protocol: "netty", // 使用 netty 协议以实现 Java 兼容性 - } - - // 创建并启动传输运行器 - runner := transport2.New(config) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // 在后台启动传输 - go func() { - if err := runner.Start(ctx); err != nil { - appLogger.Error(err, "启动传输失败") - cancel() - } - }() - - // 等待关闭信号 - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - <-sigChan - - appLogger.Info("正在关闭...") - time.Sleep(5 * time.Second) - - if err := runner.Close(); err != nil { - appLogger.Error(err, "关闭传输失败") - } -} -``` - -### 直接客户端使用 - -为了更细粒度的控制,您可以直接使用传输客户端: - -```go -package main - -import ( - "log" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" - pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg" -) - -func main() { - // 创建 Java 服务器的 Netty 客户端 - factory := &transport.TransportClientFactory{} - client, err := factory.CreateClient("netty", "127.0.0.1:1158") - if err != nil { - log.Fatal("创建客户端失败:", err) - } - - // 启动客户端 - if err := client.Start(); err != nil { - log.Fatal("启动客户端失败:", err) - } - defer client.Shutdown() - - // 注册消息处理器 - client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) { - if pbMsg, ok := msg.(*pb.Message); ok { - log.Printf("收到消息: %s", string(pbMsg.Msg)) - return &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_RESPONSE, - Identity: pbMsg.Identity, - Msg: []byte("response"), - }, nil - } - return nil, nil - }) - - // 发送心跳消息 - heartbeat := &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_REQUEST, - Identity: "go-collector", - Msg: []byte("heartbeat"), - } - - // 异步发送 - if err := client.SendMsg(heartbeat); err != nil { - log.Printf("发送消息失败: %v", err) - } - - // 同步发送,带超时 - resp, err := client.SendMsgSync(heartbeat, 5000) - if err != nil { - log.Printf("发送同步消息失败: %v", err) - } else if resp != nil { - if pbResp, ok := resp.(*pb.Message); ok { - log.Printf("收到响应: %s", string(pbResp.Msg)) - } - } -} -``` - -### 消息类型 - -Go 采集器支持 Java 版本中定义的所有消息类型: - -| 消息类型 | 值 | 描述 | -|----------|-----|------| -| HEARTBEAT | 0 | 心跳/健康检查 | -| GO_ONLINE | 1 | 采集器上线通知 | -| GO_OFFLINE | 2 | 采集器下线通知 | -| GO_CLOSE | 3 | 采集器关闭通知 | -| ISSUE_CYCLIC_TASK | 4 | 发布周期性采集任务 | -| DELETE_CYCLIC_TASK | 5 | 删除周期性采集任务 | -| ISSUE_ONE_TIME_TASK | 6 | 发布一次性采集任务 | - -### 连接管理 - -传输层提供了强大的连接管理功能: - -- **自动重连**:连接丢失时自动尝试重连 -- **连接监控**:后台监控连接健康状态 -- **心跳机制**:定期心跳消息以保持连接 -- **事件处理**:连接状态变更通知(已连接、断开连接、连接失败) - -### 错误处理 - -该实现包含全面的错误处理: - -- **连接超时**:连接尝试的正确超时处理 -- **消息序列化**:Protobuf 编组/解组错误处理 -- **响应关联**:使用身份字段正确匹配请求和响应 -- **优雅关闭**:使用上下文取消的干净关闭程序 - -## 🔍 代码逻辑分析和兼容性 - -### 实现状态 - -Go 采集器实现提供了与 Java 版本的全面兼容性: - -#### ✅ **完全实现的功能** - -1. **传输层兼容性** - - **Netty 协议**:使用长度前缀消息格式的完整实现 - - **gRPC 协议**:完整的 gRPC 服务实现,支持双向流式通信 - - **消息类型**:支持所有核心消息类型(HEARTBEAT、GO_ONLINE、GO_OFFLINE 等) - - **请求/响应模式**:正确处理同步和异步通信 - -2. **连接管理** - - **自动重连**:连接丢失时的强大重连逻辑 - - **连接监控**:带截止时间管理的后台健康检查 - - **事件系统**:连接状态变更的全面事件处理 - - **心跳机制**:用于连接维护的定期心跳消息 - -3. **消息处理** - - **处理器注册**:动态消息处理器注册和分发 - - **响应关联**:使用身份字段正确请求-响应匹配 - - **错误处理**:整个消息管道中的全面错误处理 - - **超时管理**:所有操作的可配置超时 - -4. **协议兼容性** - - **Protobuf 消息**:与 Java protobuf 定义完全兼容 - - **消息序列化**:Netty 协议的正确二进制格式处理 - - **流处理**:支持一元和流式 gRPC 操作 - -#### ⚠️ **改进领域** - -1. **任务处理逻辑** - - 当前实现为任务处理返回占位符响应 - - 需要根据具体要求实现实际采集逻辑 - - 任务调度和执行引擎需要集成 - -2. **配置管理** - - 配置文件格式需要与 Java 版本标准化 - - 环境变量支持可以增强 - - 可以添加动态配置重载 - -3. **监控和指标** - - 可以添加全面的指标收集 - - 可以增强性能监控集成 - - 可以暴露健康检查端点 - -#### 🔧 **技术实现细节** - -1. **Netty 协议实现** - ```go - // Java 兼容的长度前缀消息格式 - func (c *NettyClient) writeMessage(msg *pb.Message) error { - data, err := proto.Marshal(msg) - if err != nil { - return fmt.Errorf("消息编组失败: %w", err) - } - // 写入长度前缀(varint32) - length := len(data) - if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err != nil { - return fmt.Errorf("写入长度失败: %w", err) - } - // 写入消息数据 - if _, err := c.writer.Write(data); err != nil { - return fmt.Errorf("写入消息失败: %w", err) - } - return c.writer.Flush() - } - ``` - -2. **响应未来模式** - ```go - // 使用 ResponseFuture 进行同步通信 - func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface{}, error) { - // 为此请求创建响应未来 - future := NewResponseFuture() - c.responseTable[pbMsg.Identity] = future - defer delete(c.responseTable, pbMsg.Identity) - - // 发送消息 - if err := c.writeMessage(pbMsg); err != nil { - future.PutError(err) - return nil, err - } - - // 等待带超时的响应 - return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) - } - ``` - -3. **事件驱动架构** - ```go - // 连接事件处理 - func (c *NettyClient) triggerEvent(eventType EventType, err error) { - if c.eventHandler != nil { - c.eventHandler(Event{ - Type: eventType, - Address: c.addr, - Error: err, - }) - } - } - ``` - -#### 🎯 **兼容性评估** - -Go 实现实现了与 Java 版本的**高度兼容性**: - -- **协议级别**:100% 兼容 Netty 消息格式 -- **消息类型**:所有核心消息类型都已实现 -- **通信模式**:支持同步和异步模式 -- **连接管理**:具有自动恢复功能的强大连接处理 -- **错误处理**:全面的错误处理 - -#### 📋 **建议** - -1. **生产使用**: - - 根据具体监控要求实现实际任务处理逻辑 - - 添加全面的日志记录和监控 - - 实现配置验证和管理 - - 添加与 Java 服务器的集成测试 - -2. **开发使用**: - - 当前实现提供了坚实的基础 - - 所有核心通信模式都已正确实现 - - 协议兼容性得到了彻底解决 - - 可扩展性已构建到架构中 - -3. **测试策略**: - - 与实际 Java 服务器部署一起测试 - - 验证消息格式兼容性 - - 测试连接恢复场景 - - 验证负载下的性能 - -Go 采集器实现成功地重新创建了 Java 版本的核心通信功能,为 Go 中的 HertzBeat 监控数据采集提供了坚实的基础。 - -## 🛠️ 贡献 - -欢迎贡献!请查看 [CONTRIBUTING.md](CONTRIBUTING.md) 了解详细信息,包括代码、文档、测试和讨论。 - -## 📄 许可证 - -本项目基于 [Apache 2.0 许可证](LICENSE) 许可。 - ---- - -# HertzBeat Collector Go - -[](LICENSE) - HertzBeat-Collector-Go is the Go implementation of the collector for [Apache HertzBeat](https://github.com/apache/hertzbeat). It supports multi-protocol and multi-type monitoring data collection, featuring high performance, easy extensibility, and seamless integration. ## ✨ Features @@ -827,3 +411,7 @@ Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for det ## 📄 License This project is licensed under the [Apache 2.0 License](LICENSE). + +## 🌐 中文版本 + +For Chinese documentation, please see [README-CN.md](README-CN.md). --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
