This is an automated email from the ASF dual-hosted git repository. shown pushed a commit to branch 1014-yuluo/update-readme in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit e3f2011601fb8116428397645b38c35c9998a9b3 Author: yuluo-yx <[email protected]> AuthorDate: Tue Oct 14 22:32:50 2025 +0800 docs: refactor readme Signed-off-by: yuluo-yx <[email protected]> --- README-CN.md | 639 +---------------------------------------------------------- README.md | 639 +---------------------------------------------------------- 2 files changed, 12 insertions(+), 1266 deletions(-) diff --git a/README-CN.md b/README-CN.md index 81c7931..a0390a9 100644 --- a/README-CN.md +++ b/README-CN.md @@ -5,647 +5,20 @@ HertzBeat-Collector-Go 是 [Apache HertzBeat](https://github.com/apache/hertzbeat) 的 Go 语言实现的数据采集器。它支持多协议、多类型的监控数据采集,具有高性能、易扩展、无缝集成的特点。 -## ✨ 特性 +## 快速开始 -- 支持多种协议(HTTP、JDBC、SNMP、SSH 等)的监控数据采集 -- 灵活可扩展的任务调度、作业管理和采集策略 -- 清晰的架构设计,易于二次开发和集成 -- 丰富的开发、测试和部署脚本 -- 完善的文档和社区支持 - -## 📂 项目结构 - -```text -. -├── cmd/ # 主入口点 -├── internal/ # 核心采集器实现和通用模块 -│ ├── collector/ # 各种采集器 -│ ├── common/ # 通用模块(调度、作业、类型、日志等) -│ └── util/ # 工具类 -├── api/ # 协议定义(protobuf) -├── examples/ # 示例代码 -├── docs/ # 架构和开发文档 -├── tools/ # 构建、CI、脚本和工具 -├── Makefile # 构建入口 -└── README-CN.md # 项目说明(中文) -``` - -## � 配置架构 - -### 统一配置系统 - -采集器实现了统一配置系统,包含三个主要组件: - -#### 1. ConfigFactory(配置工厂) - -中央配置工厂提供: - -- 默认值管理 -- 环境变量处理 -- 配置验证 -- 配置操作工具方法 - -```go -// 创建带默认值的配置 -factory := config.NewConfigFactory() -cfg := factory.CreateDefaultConfig() - -// 从环境变量创建配置 -envCfg := factory.CreateFromEnv() - -// 合并文件配置与环境变量覆盖 -mergedCfg := factory.MergeWithEnv(fileCfg) - -// 验证配置 -if err := factory.ValidateConfig(cfg); err != nil { - log.Fatal("配置无效:", err) -} -``` - -#### 2. 配置入口点 - -针对不同用例的三个独立入口点: - -- **`config.LoadFromFile(path)`**: 仅文件配置加载 -- **`config.LoadFromEnv()`**: 仅环境变量配置加载 -- **`config.LoadUnified(path)`**: 组合文件 + 环境变量加载(推荐) - -#### 3. 配置结构 - -```go -type CollectorConfig struct { - Collector CollectorSection `yaml:"collector"` -} - -type CollectorSection struct { - Info CollectorInfo `yaml:"info"` - Log CollectorLogConfig `yaml:"log"` - Manager ManagerConfig `yaml:"manager"` - Identity string `yaml:"identity"` - Mode string `yaml:"mode"` -} - -type ManagerConfig struct { - Host string `yaml:"host"` - Port string `yaml:"port"` - Protocol string `yaml:"protocol"` -} +```shell +make run ``` -#### 4. 配置验证 - -系统包含全面的验证: - -- **必需字段**: 身份、模式、管理器主机/端口 -- **值验证**: 端口号、协议类型、模式值 -- **格式验证**: IP 地址、日志级别 - -#### 5. 默认值 - -| 字段 | 默认值 | 描述 | -| ---------------- | ------------------------ | -------------- | -| Identity | `hertzbeat-collector-go` | 采集器标识符 | -| Mode | `public` | 采集器模式 | -| Collector.Name | `hertzbeat-collector-go` | 采集器服务名称 | -| Collector.IP | `127.0.0.1` | 采集器绑定地址 | -| Collector.Port | `8080` | 采集器服务端口 | -| Manager.Host | `127.0.0.1` | 管理服务器主机 | -| Manager.Port | `1158` | 管理服务器端口 | -| Manager.Protocol | `netty` | 通信协议 | -| Log.Level | `info` | 日志级别 | - -### 从旧配置迁移 - -如果您有现有配置,以下是迁移方法: - -#### 旧格式 (transport.yaml) - -```yaml -server: - host: "0.0.0.0" - port: 1158 -transport: - protocol: "netty" - server_addr: "127.0.0.1:1158" -``` - -#### 新格式 (hertzbeat-collector.yaml) - -```yaml -collector: - info: - name: hertzbeat-collector-go - ip: 0.0.0.0 - port: 8080 - manager: - host: 127.0.0.1 - port: 1158 - protocol: netty - identity: hertzbeat-collector-go - mode: public -``` - -## �🚀 快速开始 - -### 1. 构建和运行 - -```bash -# 安装依赖 -go mod tidy - -# 构建 -make build - -# 运行 -./bin/collector server --config etc/hertzbeat-collector.yaml -``` - -### 2. 配置选项 - -采集器支持多种配置方法,具有统一的配置系统: - -#### 基于文件的配置 - -```bash -# 使用配置文件运行 -./bin/collector server --config etc/hertzbeat-collector.yaml -``` - -配置文件示例(`etc/hertzbeat-collector.yaml`): - -```yaml -collector: - info: - name: hertzbeat-collector-go - ip: 127.0.0.1 - port: 8080 - - log: - level: debug - - # 管理器/传输配置 - manager: - host: 127.0.0.1 - port: 1158 - protocol: netty - - # 采集器身份和模式 - identity: hertzbeat-collector-go - mode: public -``` - -#### 环境变量配置(Docker 兼容) - -Go 版本完全兼容 Java 版本的环境变量配置: - -```bash -# 设置环境变量 -export IDENTITY=local -export COLLECTOR_NAME=hertzbeat-collector-go -export COLLECTOR_IP=127.0.0.1 -export COLLECTOR_PORT=8080 -export MANAGER_HOST=192.168.97.0 -export MANAGER_PORT=1158 -export MANAGER_PROTOCOL=grpc -export MODE=public -export LOG_LEVEL=info - -# 使用环境变量运行 -./bin/collector server - -# 或使用 Docker -docker run -d \ - -e IDENTITY=local \ - -e MANAGER_HOST=192.168.97.0 \ - -e MANAGER_PORT=1158 \ - -e MANAGER_PROTOCOL=grpc \ - -e MODE=public \ - --name hertzbeat-collector-go \ - hertzbeat-collector-go -``` - -#### 统一配置(推荐) - -采集器使用统一配置系统,支持文件和环境变量配置: - -- **文件 + 环境变量**:环境变量覆盖文件设置 -- **仅环境变量**:纯环境变量配置 -- **仅文件**:纯基于文件的配置 - -配置优先级(从高到低): - -1. 环境变量 -2. 配置文件值 -3. 内置默认值 - -#### 支持的环境变量 - -| 环境变量 | 描述 | 默认值 | -| ------------------ | -------------------------------- | ------------------------ | -| `IDENTITY` | 采集器身份 | `hertzbeat-collector-go` | -| `MODE` | 采集器模式(`public`/`private`) | `public` | -| `COLLECTOR_NAME` | 采集器名称 | `hertzbeat-collector-go` | -| `COLLECTOR_IP` | 采集器绑定 IP | `127.0.0.1` | -| `COLLECTOR_PORT` | 采集器绑定端口 | `8080` | -| `MANAGER_HOST` | 管理服务器主机 | `127.0.0.1` | -| `MANAGER_PORT` | 管理服务器端口 | `1158` | -| `MANAGER_PROTOCOL` | 协议(`netty`/`grpc`) | `netty` | -| `LOG_LEVEL` | 日志级别 | `info` | - -### 3. 示例 - -查看 `examples/` 目录获取各种使用示例: - -- `examples/main.go` - 使用环境变量的主要示例 -- `examples/README.md` - 完整使用指南 -- `examples/Dockerfile` - Docker 构建示例 - -## 🔄 Java 服务器集成 - -该 Go 采集器设计为与 Java 版本的 HertzBeat 管理服务器兼容。传输层支持 gRPC 和 Netty 协议,实现无缝集成。 - -### 协议支持 - -Go 采集器支持两种通信协议: - -1. **Netty 协议**(推荐用于 Java 服务器兼容性) - - - 使用长度前缀的 protobuf 消息格式 - - 与 Java Netty 服务器实现兼容 - - 默认端口:1158 - -2. **gRPC 协议** - - 使用标准 gRPC 和 protobuf - - 支持双向流式通信 - - 默认端口:1159 - -### 配置 - -#### 基础配置 - -采集器通过多种入口点支持灵活配置: - -```yaml -# etc/hertzbeat-collector.yaml -collector: - info: - name: hertzbeat-collector-go - ip: 127.0.0.1 - port: 8080 - - log: - level: debug - - # 管理器/传输配置 - manager: - host: 127.0.0.1 - port: 1158 - protocol: netty - - # 采集器身份和模式 - identity: hertzbeat-collector-go - mode: public -``` - -#### 配置加载方法 - -采集器提供三种配置加载方法: - -1. **仅文件配置**: - - ```go - import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - - cfg, err := config.LoadFromFile("etc/hertzbeat-collector.yaml") - if err != nil { - log.Fatal("配置加载失败:", err) - } - ``` - -2. **仅环境变量配置**: - - ```go - import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - - cfg := config.LoadFromEnv() - ``` - -3. **统一配置(推荐)**: - - ```go - import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - - // 环境变量覆盖文件值 - cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml") - if err != nil { - log.Fatal("配置加载失败:", err) - } - ``` - -#### 连接 Java 服务器 - -```go -package main - -import ( - "context" - "log" - "os" - "os/signal" - "syscall" - "time" - - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" -) - -func main() { - // 使用统一加载器加载配置(文件 + 环境变量) - cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml") - if err != nil { - log.Fatal("配置加载失败:", err) - } - - // 使用统一配置创建传输运行器 - runner := transport.New(cfg) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // 在后台启动传输 - go func() { - if err := runner.Start(ctx); err != nil { - log.Printf("启动传输失败: %v", err) - cancel() - } - }() - - // 等待关闭信号 - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - <-sigChan - - log.Println("正在关闭...") - - if err := runner.Close(); err != nil { - log.Printf("关闭传输失败: %v", err) - } -} -``` - -### 直接客户端使用 - -为了更细粒度的控制,您可以直接使用传输客户端: - -```go -package main - -import ( - "log" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" - pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg" -) - -func main() { - // 创建 Java 服务器的 Netty 客户端 - factory := &transport.TransportClientFactory{} - client, err := factory.CreateClient("netty", "127.0.0.1:1158") - if err != nil { - log.Fatal("创建客户端失败:", err) - } - - // 启动客户端 - if err := client.Start(); err != nil { - log.Fatal("启动客户端失败:", err) - } - defer client.Shutdown() - - // 注册消息处理器 - client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) { - if pbMsg, ok := msg.(*pb.Message); ok { - log.Printf("收到消息: %s", string(pbMsg.Msg)) - return &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_RESPONSE, - Identity: pbMsg.Identity, - Msg: []byte("response"), - }, nil - } - return nil, nil - }) - - // 发送心跳消息 - heartbeat := &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_REQUEST, - Identity: "go-collector", - Msg: []byte("heartbeat"), - } - - // 异步发送 - if err := client.SendMsg(heartbeat); err != nil { - log.Printf("发送消息失败: %v", err) - } - - // 同步发送,带超时 - resp, err := client.SendMsgSync(heartbeat, 5000) - if err != nil { - log.Printf("发送同步消息失败: %v", err) - } else if resp != nil { - if pbResp, ok := resp.(*pb.Message); ok { - log.Printf("收到响应: %s", string(pbResp.Msg)) - } - } -} -``` - -### 消息类型 - -Go 采集器支持 Java 版本中定义的所有消息类型: - -| 消息类型 | 值 | 描述 | -| ------------------- | --- | ------------------ | -| HEARTBEAT | 0 | 心跳/健康检查 | -| GO_ONLINE | 1 | 采集器上线通知 | -| GO_OFFLINE | 2 | 采集器下线通知 | -| GO_CLOSE | 3 | 采集器关闭通知 | -| ISSUE_CYCLIC_TASK | 4 | 发布周期性采集任务 | -| DELETE_CYCLIC_TASK | 5 | 删除周期性采集任务 | -| ISSUE_ONE_TIME_TASK | 6 | 发布一次性采集任务 | - -### 连接管理 - -传输层提供了强大的连接管理功能: - -- **自动重连**:连接丢失时自动尝试重连 -- **连接监控**:后台监控连接健康状态 -- **心跳机制**:定期心跳消息以保持连接 -- **事件处理**:连接状态变更通知(已连接、断开连接、连接失败) - -### 错误处理 - -该实现包含全面的错误处理: - -- **连接超时**:连接尝试的正确超时处理 -- **消息序列化**:Protobuf 编组/解组错误处理 -- **响应关联**:使用身份字段正确匹配请求和响应 -- **优雅关闭**:使用上下文取消的干净关闭程序 - -## 🔍 代码逻辑分析和兼容性 - -### 实现状态 - -Go 采集器实现提供了与 Java 版本的全面兼容性: - -#### ✅ **完全实现的功能** - -1. **传输层兼容性** - - - **Netty 协议**:使用长度前缀消息格式的完整实现 - - **gRPC 协议**:完整的 gRPC 服务实现,支持双向流式通信 - - **消息类型**:支持所有核心消息类型(HEARTBEAT、GO_ONLINE、GO_OFFLINE 等) - - **请求/响应模式**:正确处理同步和异步通信 - -2. **连接管理** - - - **自动重连**:连接丢失时的强大重连逻辑 - - **连接监控**:带截止时间管理的后台健康检查 - - **事件系统**:连接状态变更的全面事件处理 - - **心跳机制**:用于连接维护的定期心跳消息 - -3. **消息处理** - - - **处理器注册**:动态消息处理器注册和分发 - - **响应关联**:使用身份字段正确请求-响应匹配 - - **错误处理**:整个消息管道中的全面错误处理 - - **超时管理**:所有操作的可配置超时 - -4. **协议兼容性** - - **Protobuf 消息**:与 Java protobuf 定义完全兼容 - - **消息序列化**:Netty 协议的正确二进制格式处理 - - **流处理**:支持一元和流式 gRPC 操作 - -#### ⚠️ **改进领域** - -1. **任务处理逻辑** - - - 当前实现为任务处理返回占位符响应 - - 需要根据具体要求实现实际采集逻辑 - - 任务调度和执行引擎需要集成 - -2. **配置管理** - - - 配置文件格式需要与 Java 版本标准化 - - 环境变量支持可以增强 - - 可以添加动态配置重载 - -3. **监控和指标** - - 可以添加全面的指标收集 - - 可以增强性能监控集成 - - 可以暴露健康检查端点 - -#### 🔧 **技术实现细节** - -1. **Netty 协议实现** - - ```go - // Java 兼容的长度前缀消息格式 - func (c *NettyClient) writeMessage(msg *pb.Message) error { - data, err := proto.Marshal(msg) - if err != nil { - return fmt.Errorf("消息编组失败: %w", err) - } - // 写入长度前缀(varint32) - length := len(data) - if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err != nil { - return fmt.Errorf("写入长度失败: %w", err) - } - // 写入消息数据 - if _, err := c.writer.Write(data); err != nil { - return fmt.Errorf("写入消息失败: %w", err) - } - return c.writer.Flush() - } - ``` - -2. **响应未来模式** - - ```go - // 使用 ResponseFuture 进行同步通信 - func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface{}, error) { - // 为此请求创建响应未来 - future := NewResponseFuture() - c.responseTable[pbMsg.Identity] = future - defer delete(c.responseTable, pbMsg.Identity) - - // 发送消息 - if err := c.writeMessage(pbMsg); err != nil { - future.PutError(err) - return nil, err - } - - // 等待带超时的响应 - return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) - } - ``` - -3. **事件驱动架构** - - ```go - // 连接事件处理 - func (c *NettyClient) triggerEvent(eventType EventType, err error) { - if c.eventHandler != nil { - c.eventHandler(Event{ - Type: eventType, - Address: c.addr, - Error: err, - }) - } - } - ``` - -#### 🎯 **兼容性评估** - -Go 实现实现了与 Java 版本的**高度兼容性**: - -- **协议级别**:100% 兼容 Netty 消息格式 -- **消息类型**:所有核心消息类型都已实现 -- **通信模式**:支持同步和异步模式 -- **连接管理**:具有自动恢复功能的强大连接处理 -- **错误处理**:全面的错误处理 - -#### 📋 **建议** - -1. **生产使用**: - - - 根据具体监控要求实现实际任务处理逻辑 - - 添加全面的日志记录和监控 - - 实现配置验证和管理 - - 添加与 Java 服务器的集成测试 - -2. **开发使用**: - - - 当前实现提供了坚实的基础 - - 所有核心通信模式都已正确实现 - - 协议兼容性得到了彻底解决 - - 可扩展性已构建到架构中 - -3. **测试策略**: - - 与实际 Java 服务器部署一起测试 - - 验证消息格式兼容性 - - 测试连接恢复场景 - - 验证负载下的性能 - -Go 采集器实现成功地重新创建了 Java 版本的核心通信功能,为 Go 中的 HertzBeat 监控数据采集提供了坚实的基础。 - -## 🛠️ 贡献 +## 贡献 欢迎贡献!请查看 [CONTRIBUTING.md](CONTRIBUTING.md) 了解详细信息,包括代码、文档、测试和讨论。 -## 📄 许可证 +## 许可证 本项目基于 [Apache 2.0 许可证](LICENSE) 许可。 -## 🌐 English Version +## English Version For English documentation, please see [README.md](README.md). diff --git a/README.md b/README.md index 240db01..43b3d5e 100644 --- a/README.md +++ b/README.md @@ -5,647 +5,20 @@ HertzBeat-Collector-Go is the Go implementation of the collector for [Apache HertzBeat](https://github.com/apache/hertzbeat). It supports multi-protocol and multi-type monitoring data collection, featuring high performance, easy extensibility, and seamless integration. -## ✨ Features +## Quick Start -- Supports various protocols (HTTP, JDBC, SNMP, SSH, etc.) for monitoring data collection -- Flexible and extensible task scheduling, job management, and collection strategies -- Clean architecture, easy for secondary development and integration -- Rich development, testing, and deployment scripts -- Comprehensive documentation and community support - -## 📂 Project Structure - -```text -. -├── cmd/ # Main entry point -├── internal/ # Core collector implementation and common modules -│ ├── collector/ # Various collectors -│ ├── common/ # Common modules (scheduling, jobs, types, logging, etc.) -│ └── util/ # Utilities -├── api/ # Protocol definitions (protobuf) -├── examples/ # Example code -├── docs/ # Architecture and development docs -├── tools/ # Build, CI, scripts, and tools -├── Makefile # Build entry -└── README.md # Project description -``` - -## Configuration Architecture - -### Unified Configuration System - -The collector implements a unified configuration system with three main components: - -#### 1. ConfigFactory - -Central configuration factory that provides: - -- Default values management -- Environment variable processing -- Configuration validation -- Utility methods for configuration manipulation - -```go -// Create configuration with defaults -factory := config.NewConfigFactory() -cfg := factory.CreateDefaultConfig() - -// Create from environment variables -envCfg := factory.CreateFromEnv() - -// Merge file config with environment overrides -mergedCfg := factory.MergeWithEnv(fileCfg) - -// Validate configuration -if err := factory.ValidateConfig(cfg); err != nil { - log.Fatal("Invalid configuration:", err) -} -``` - -#### 2. Configuration Entry Points - -Three distinct entry points for different use cases: - -- **`config.LoadFromFile(path)`**: File-only configuration loading -- **`config.LoadFromEnv()`**: Environment-only configuration loading -- **`config.LoadUnified(path)`**: Combined file + environment loading (recommended) - -#### 3. Configuration Structure - -```go -type CollectorConfig struct { - Collector CollectorSection `yaml:"collector"` -} - -type CollectorSection struct { - Info CollectorInfo `yaml:"info"` - Log CollectorLogConfig `yaml:"log"` - Manager ManagerConfig `yaml:"manager"` - Identity string `yaml:"identity"` - Mode string `yaml:"mode"` -} - -type ManagerConfig struct { - Host string `yaml:"host"` - Port string `yaml:"port"` - Protocol string `yaml:"protocol"` -} +```shell +make run ``` -#### 4. Configuration Validation - -The system includes comprehensive validation: - -- **Required fields**: Identity, mode, manager host/port -- **Value validation**: Port numbers, protocol types, mode values -- **Format validation**: IP addresses, log levels - -#### 5. Default Values - -| Field | Default Value | Description | -| ---------------- | ------------------------ | ---------------------- | -| Identity | `hertzbeat-collector-go` | Collector identifier | -| Mode | `public` | Collector mode | -| Collector.Name | `hertzbeat-collector-go` | Collector service name | -| Collector.IP | `127.0.0.1` | Collector bind address | -| Collector.Port | `8080` | Collector service port | -| Manager.Host | `127.0.0.1` | Manager server host | -| Manager.Port | `1158` | Manager server port | -| Manager.Protocol | `netty` | Communication protocol | -| Log.Level | `info` | Logging level | - -### Migration from Legacy Configuration - -If you have existing configurations, here's how to migrate: - -#### Legacy Format (transport.yaml) - -```yaml -server: - host: "0.0.0.0" - port: 1158 -transport: - protocol: "netty" - server_addr: "127.0.0.1:1158" -``` - -#### New Format (hertzbeat-collector.yaml) - -```yaml -collector: - info: - name: hertzbeat-collector-go - ip: 0.0.0.0 - port: 8080 - manager: - host: 127.0.0.1 - port: 1158 - protocol: netty - identity: hertzbeat-collector-go - mode: public -``` - -## �🚀 Quick Start - -### 1. Build and Run - -```bash -# Install dependencies -go mod tidy - -# Build -make build - -# Run -./bin/collector server --config etc/hertzbeat-collector.yaml -``` - -### 2. Configuration Options - -The collector supports multiple configuration methods with a unified configuration system: - -#### File-based Configuration - -```bash -# Run with configuration file -./bin/collector server --config etc/hertzbeat-collector.yaml -``` - -Example configuration file (`etc/hertzbeat-collector.yaml`): - -```yaml -collector: - info: - name: hertzbeat-collector-go - ip: 127.0.0.1 - port: 8080 - - log: - level: debug - - # Manager/Transport configuration - manager: - host: 127.0.0.1 - port: 1158 - protocol: netty - - # Collector identity and mode - identity: hertzbeat-collector-go - mode: public -``` - -#### Environment Variables (Docker Compatible) - -The Go version is fully compatible with the Java version's environment variable configuration: - -```bash -# Set environment variables -export IDENTITY=local -export COLLECTOR_NAME=hertzbeat-collector-go -export COLLECTOR_IP=127.0.0.1 -export COLLECTOR_PORT=8080 -export MANAGER_HOST=192.168.97.0 -export MANAGER_PORT=1158 -export MANAGER_PROTOCOL=grpc -export MODE=public -export LOG_LEVEL=info - -# Run with environment variables -./bin/collector server - -# Or use Docker -docker run -d \ - -e IDENTITY=local \ - -e MANAGER_HOST=192.168.97.0 \ - -e MANAGER_PORT=1158 \ - -e MANAGER_PROTOCOL=grpc \ - -e MODE=public \ - --name hertzbeat-collector-go \ - hertzbeat-collector-go -``` - -#### Unified Configuration (Recommended) - -The collector uses a unified configuration system that supports both file and environment variable configurations: - -- **File + Environment**: Environment variables override file settings -- **Environment Only**: Pure environment variable configuration -- **File Only**: Pure file-based configuration - -Configuration precedence (highest to lowest): - -1. Environment variables -2. Configuration file values -3. Built-in defaults - -#### Supported Environment Variables - -| Environment Variable | Description | Default Value | -| -------------------- | ----------------------------------- | ------------------------ | -| `IDENTITY` | Collector identity | `hertzbeat-collector-go` | -| `MODE` | Collector mode (`public`/`private`) | `public` | -| `COLLECTOR_NAME` | Collector name | `hertzbeat-collector-go` | -| `COLLECTOR_IP` | Collector bind IP | `127.0.0.1` | -| `COLLECTOR_PORT` | Collector bind port | `8080` | -| `MANAGER_HOST` | Manager server host | `127.0.0.1` | -| `MANAGER_PORT` | Manager server port | `1158` | -| `MANAGER_PROTOCOL` | Protocol (`netty`/`grpc`) | `netty` | -| `LOG_LEVEL` | Log level | `info` | - -### 3. Examples - -See `examples/` directory for various usage examples: - -- `examples/main.go` - Main example with environment variables -- `examples/README.md` - Complete usage guide -- `examples/Dockerfile` - Docker build example - -## 🔄 Java Server Integration - -This Go collector is designed to be compatible with the Java version of HertzBeat manager server. The transport layer supports both gRPC and Netty protocols for seamless integration. - -### Protocol Support - -The Go collector supports two communication protocols: - -1. **Netty Protocol** (Recommended for Java server compatibility) - - - Uses length-prefixed protobuf message format - - Compatible with Java Netty server implementation - - Default port: 1158 - -2. **gRPC Protocol** - - Uses standard gRPC with protobuf - - Supports bidirectional streaming - - Default port: 1159 - -### Configuration - -#### Basic Configuration - -The collector supports flexible configuration through multiple entry points: - -```yaml -# etc/hertzbeat-collector.yaml -collector: - info: - name: hertzbeat-collector-go - ip: 127.0.0.1 - port: 8080 - - log: - level: debug - - # Manager/Transport configuration - manager: - host: 127.0.0.1 - port: 1158 - protocol: netty - - # Collector identity and mode - identity: hertzbeat-collector-go - mode: public -``` - -#### Configuration Loading Methods - -The collector provides three configuration loading methods: - -1. **File-only Configuration**: - - ```go - import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - - cfg, err := config.LoadFromFile("etc/hertzbeat-collector.yaml") - if err != nil { - log.Fatal("Failed to load config:", err) - } - ``` - -2. **Environment-only Configuration**: - - ```go - import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - - cfg := config.LoadFromEnv() - ``` - -3. **Unified Configuration (Recommended)**: - - ```go - import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - - // Environment variables override file values - cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml") - if err != nil { - log.Fatal("Failed to load config:", err) - } - ``` - -#### Connecting to Java Server - -```go -package main - -import ( - "context" - "log" - "os" - "os/signal" - "syscall" - "time" - - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" -) - -func main() { - // Load configuration using unified loader (file + env) - cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml") - if err != nil { - log.Fatal("Failed to load configuration:", err) - } - - // Create transport runner with unified config - runner := transport.New(cfg) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Start transport in background - go func() { - if err := runner.Start(ctx); err != nil { - log.Printf("Failed to start transport: %v", err) - cancel() - } - }() - - // Wait for shutdown signal - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - <-sigChan - - log.Println("Shutting down...") - - if err := runner.Close(); err != nil { - log.Printf("Failed to close transport: %v", err) - } -} -``` - -### Direct Client Usage - -For more granular control, you can use the transport client directly: - -```go -package main - -import ( - "log" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" - pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg" -) - -func main() { - // Create Netty client for Java server - factory := &transport.TransportClientFactory{} - client, err := factory.CreateClient("netty", "127.0.0.1:1158") - if err != nil { - log.Fatal("Failed to create client:", err) - } - - // Start client - if err := client.Start(); err != nil { - log.Fatal("Failed to start client:", err) - } - defer client.Shutdown() - - // Register message processor - client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) { - if pbMsg, ok := msg.(*pb.Message); ok { - log.Printf("Received message: %s", string(pbMsg.Msg)) - return &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_RESPONSE, - Identity: pbMsg.Identity, - Msg: []byte("response"), - }, nil - } - return nil, nil - }) - - // Send heartbeat message - heartbeat := &pb.Message{ - Type: pb.MessageType_HEARTBEAT, - Direction: pb.Direction_REQUEST, - Identity: "go-collector", - Msg: []byte("heartbeat"), - } - - // Async send - if err := client.SendMsg(heartbeat); err != nil { - log.Printf("Failed to send message: %v", err) - } - - // Sync send with timeout - resp, err := client.SendMsgSync(heartbeat, 5000) - if err != nil { - log.Printf("Failed to send sync message: %v", err) - } else if resp != nil { - if pbResp, ok := resp.(*pb.Message); ok { - log.Printf("Received response: %s", string(pbResp.Msg)) - } - } -} -``` - -### Message Types - -The Go collector supports all message types defined in the Java version: - -| Message Type | Value | Description | -| ------------------- | ----- | ------------------------------- | -| HEARTBEAT | 0 | Heartbeat/health check | -| GO_ONLINE | 1 | Collector online notification | -| GO_OFFLINE | 2 | Collector offline notification | -| GO_CLOSE | 3 | Collector shutdown notification | -| ISSUE_CYCLIC_TASK | 4 | Issue cyclic collection task | -| DELETE_CYCLIC_TASK | 5 | Delete cyclic collection task | -| ISSUE_ONE_TIME_TASK | 6 | Issue one-time collection task | - -### Connection Management - -The transport layer provides robust connection management: - -- **Auto-reconnection**: Automatically attempts to reconnect when connection is lost -- **Connection monitoring**: Background monitoring of connection health -- **Heartbeat mechanism**: Regular heartbeat messages to maintain connection -- **Event handling**: Connection state change notifications (connected, disconnected, connection failed) - -### Error Handling - -The implementation includes comprehensive error handling: - -- **Connection timeouts**: Proper timeout handling for connection attempts -- **Message serialization**: Protobuf marshaling/unmarshaling error handling -- **Response correlation**: Proper matching of requests and responses using identity field -- **Graceful shutdown**: Clean shutdown procedures with context cancellation - -## 🔍 Code Logic Analysis and Compatibility - -### Implementation Status - -The Go collector implementation provides comprehensive compatibility with the Java version: - -#### ✅ **Fully Implemented Features** - -1. **Transport Layer Compatibility** - - - **Netty Protocol**: Complete implementation with length-prefixed message format - - **gRPC Protocol**: Full gRPC service implementation with bidirectional streaming - - **Message Types**: All core message types (HEARTBEAT, GO_ONLINE, GO_OFFLINE, etc.) are supported - - **Request/Response Pattern**: Proper handling of synchronous and asynchronous communication - -2. **Connection Management** - - - **Auto-reconnection**: Robust reconnection logic when connection is lost - - **Connection Monitoring**: Background health checks with deadline management - - **Event System**: Comprehensive event handling for connection state changes - - **Heartbeat Mechanism**: Regular heartbeat messages for connection maintenance - -3. **Message Processing** - - - **Processor Registry**: Dynamic message processor registration and dispatch - - **Response Correlation**: Proper request-response matching using identity field - - **Error Handling**: Comprehensive error handling throughout the message pipeline - - **Timeout Management**: Configurable timeouts for all operations - -4. **Protocol Compatibility** - - **Protobuf Messages**: Exact compatibility with Java protobuf definitions - - **Message Serialization**: Proper binary format handling for Netty protocol - - **Stream Processing**: Support for both unary and streaming gRPC operations - -#### ⚠️ **Areas for Improvement** - -1. **Task Processing Logic** - - - Current implementation returns placeholder responses for task processing - - Actual collection logic needs to be implemented based on specific requirements - - Task scheduling and execution engine needs integration - -2. **Configuration Management** - - - Configuration file format needs to be standardized with Java version - - Environment variable support could be enhanced - - Dynamic configuration reloading could be added - -3. **Monitoring and Metrics** - - Comprehensive metrics collection could be added - - Performance monitoring integration could be enhanced - - Health check endpoints could be exposed - -#### 🔧 **Technical Implementation Details** - -1. **Netty Protocol Implementation** - - ```go - // Length-prefixed message format for Java compatibility - func (c *NettyClient) writeMessage(msg *pb.Message) error { - data, err := proto.Marshal(msg) - if err != nil { - return fmt.Errorf("failed to marshal message: %w", err) - } - // Write length prefix (varint32) - length := len(data) - if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err != nil { - return fmt.Errorf("failed to write length: %w", err) - } - // Write message data - if _, err := c.writer.Write(data); err != nil { - return fmt.Errorf("failed to write message: %w", err) - } - return c.writer.Flush() - } - ``` - -2. **Response Future Pattern** - - ```go - // Synchronous communication using ResponseFuture - func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface{}, error) { - // Create response future for this request - future := NewResponseFuture() - c.responseTable[pbMsg.Identity] = future - defer delete(c.responseTable, pbMsg.Identity) - - // Send message - if err := c.writeMessage(pbMsg); err != nil { - future.PutError(err) - return nil, err - } - - // Wait for response with timeout - return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) - } - ``` - -3. **Event-Driven Architecture** - - ```go - // Connection event handling - func (c *NettyClient) triggerEvent(eventType EventType, err error) { - if c.eventHandler != nil { - c.eventHandler(Event{ - Type: eventType, - Address: c.addr, - Error: err, - }) - } - } - ``` - -#### 🎯 **Compatibility Assessment** - -The Go implementation achieves **high compatibility** with the Java version: - -- **Protocol Level**: 100% compatible with Netty message format -- **Message Types**: All core message types implemented -- **Communication Patterns**: Both sync and async patterns supported -- **Connection Management**: Robust connection handling with auto-recovery -- **Error Handling**: Comprehensive error handling throughout - -#### 📋 **Recommendations** - -1. **For Production Use**: - - - Implement actual task processing logic based on specific monitoring requirements - - Add comprehensive logging and monitoring - - Implement configuration validation and management - - Add integration tests with Java server - -2. **For Development**: - - - The current implementation provides a solid foundation - - All core communication patterns are correctly implemented - - Protocol compatibility is thoroughly addressed - - Extensibility is built into the architecture - -3. **Testing Strategy**: - - Test with actual Java server deployment - - Verify message format compatibility - - Test connection recovery scenarios - - Validate performance under load - -The Go collector implementation successfully recreates the core communication capabilities of the Java version, providing a solid foundation for HertzBeat monitoring data collection in Go. - -## 🛠️ Contributing +## Contributing Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for details, including code, documentation, tests, and discussions. -## 📄 License +## License This project is licensed under the [Apache 2.0 License](LICENSE). -## 🌐 中文版本 +## 中文版本 For Chinese documentation, please see [README-CN.md](README-CN.md). --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
