This is an automated email from the ASF dual-hosted git repository.
gaoxingcun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 06cea69 docs: refactor readme (#23)
06cea69 is described below
commit 06cea69a2c299962beebc847f77bbf7608aaf7ae
Author: shown <[email protected]>
AuthorDate: Mon Oct 20 17:25:05 2025 +0800
docs: refactor readme (#23)
Signed-off-by: yuluo-yx <[email protected]>
Co-authored-by: 铁甲小宝 <[email protected]>
---
README-CN.md | 639 +----------------------------------------------------------
README.md | 639 +----------------------------------------------------------
2 files changed, 12 insertions(+), 1266 deletions(-)
diff --git a/README-CN.md b/README-CN.md
index 81c7931..a0390a9 100644
--- a/README-CN.md
+++ b/README-CN.md
@@ -5,647 +5,20 @@
HertzBeat-Collector-Go 是 [Apache
HertzBeat](https://github.com/apache/hertzbeat) 的 Go
语言实现的数据采集器。它支持多协议、多类型的监控数据采集,具有高性能、易扩展、无缝集成的特点。
-## ✨ 特性
+## 快速开始
-- 支持多种协议(HTTP、JDBC、SNMP、SSH 等)的监控数据采集
-- 灵活可扩展的任务调度、作业管理和采集策略
-- 清晰的架构设计,易于二次开发和集成
-- 丰富的开发、测试和部署脚本
-- 完善的文档和社区支持
-
-## 📂 项目结构
-
-```text
-.
-├── cmd/ # 主入口点
-├── internal/ # 核心采集器实现和通用模块
-│ ├── collector/ # 各种采集器
-│ ├── common/ # 通用模块(调度、作业、类型、日志等)
-│ └── util/ # 工具类
-├── api/ # 协议定义(protobuf)
-├── examples/ # 示例代码
-├── docs/ # 架构和开发文档
-├── tools/ # 构建、CI、脚本和工具
-├── Makefile # 构建入口
-└── README-CN.md # 项目说明(中文)
-```
-
-## � 配置架构
-
-### 统一配置系统
-
-采集器实现了统一配置系统,包含三个主要组件:
-
-#### 1. ConfigFactory(配置工厂)
-
-中央配置工厂提供:
-
-- 默认值管理
-- 环境变量处理
-- 配置验证
-- 配置操作工具方法
-
-```go
-// 创建带默认值的配置
-factory := config.NewConfigFactory()
-cfg := factory.CreateDefaultConfig()
-
-// 从环境变量创建配置
-envCfg := factory.CreateFromEnv()
-
-// 合并文件配置与环境变量覆盖
-mergedCfg := factory.MergeWithEnv(fileCfg)
-
-// 验证配置
-if err := factory.ValidateConfig(cfg); err != nil {
- log.Fatal("配置无效:", err)
-}
-```
-
-#### 2. 配置入口点
-
-针对不同用例的三个独立入口点:
-
-- **`config.LoadFromFile(path)`**: 仅文件配置加载
-- **`config.LoadFromEnv()`**: 仅环境变量配置加载
-- **`config.LoadUnified(path)`**: 组合文件 + 环境变量加载(推荐)
-
-#### 3. 配置结构
-
-```go
-type CollectorConfig struct {
- Collector CollectorSection `yaml:"collector"`
-}
-
-type CollectorSection struct {
- Info CollectorInfo `yaml:"info"`
- Log CollectorLogConfig `yaml:"log"`
- Manager ManagerConfig `yaml:"manager"`
- Identity string `yaml:"identity"`
- Mode string `yaml:"mode"`
-}
-
-type ManagerConfig struct {
- Host string `yaml:"host"`
- Port string `yaml:"port"`
- Protocol string `yaml:"protocol"`
-}
+```shell
+make run
```
-#### 4. 配置验证
-
-系统包含全面的验证:
-
-- **必需字段**: 身份、模式、管理器主机/端口
-- **值验证**: 端口号、协议类型、模式值
-- **格式验证**: IP 地址、日志级别
-
-#### 5. 默认值
-
-| 字段 | 默认值 | 描述 |
-| ---------------- | ------------------------ | -------------- |
-| Identity | `hertzbeat-collector-go` | 采集器标识符 |
-| Mode | `public` | 采集器模式 |
-| Collector.Name | `hertzbeat-collector-go` | 采集器服务名称 |
-| Collector.IP | `127.0.0.1` | 采集器绑定地址 |
-| Collector.Port | `8080` | 采集器服务端口 |
-| Manager.Host | `127.0.0.1` | 管理服务器主机 |
-| Manager.Port | `1158` | 管理服务器端口 |
-| Manager.Protocol | `netty` | 通信协议 |
-| Log.Level | `info` | 日志级别 |
-
-### 从旧配置迁移
-
-如果您有现有配置,以下是迁移方法:
-
-#### 旧格式 (transport.yaml)
-
-```yaml
-server:
- host: "0.0.0.0"
- port: 1158
-transport:
- protocol: "netty"
- server_addr: "127.0.0.1:1158"
-```
-
-#### 新格式 (hertzbeat-collector.yaml)
-
-```yaml
-collector:
- info:
- name: hertzbeat-collector-go
- ip: 0.0.0.0
- port: 8080
- manager:
- host: 127.0.0.1
- port: 1158
- protocol: netty
- identity: hertzbeat-collector-go
- mode: public
-```
-
-## �🚀 快速开始
-
-### 1. 构建和运行
-
-```bash
-# 安装依赖
-go mod tidy
-
-# 构建
-make build
-
-# 运行
-./bin/collector server --config etc/hertzbeat-collector.yaml
-```
-
-### 2. 配置选项
-
-采集器支持多种配置方法,具有统一的配置系统:
-
-#### 基于文件的配置
-
-```bash
-# 使用配置文件运行
-./bin/collector server --config etc/hertzbeat-collector.yaml
-```
-
-配置文件示例(`etc/hertzbeat-collector.yaml`):
-
-```yaml
-collector:
- info:
- name: hertzbeat-collector-go
- ip: 127.0.0.1
- port: 8080
-
- log:
- level: debug
-
- # 管理器/传输配置
- manager:
- host: 127.0.0.1
- port: 1158
- protocol: netty
-
- # 采集器身份和模式
- identity: hertzbeat-collector-go
- mode: public
-```
-
-#### 环境变量配置(Docker 兼容)
-
-Go 版本完全兼容 Java 版本的环境变量配置:
-
-```bash
-# 设置环境变量
-export IDENTITY=local
-export COLLECTOR_NAME=hertzbeat-collector-go
-export COLLECTOR_IP=127.0.0.1
-export COLLECTOR_PORT=8080
-export MANAGER_HOST=192.168.97.0
-export MANAGER_PORT=1158
-export MANAGER_PROTOCOL=grpc
-export MODE=public
-export LOG_LEVEL=info
-
-# 使用环境变量运行
-./bin/collector server
-
-# 或使用 Docker
-docker run -d \
- -e IDENTITY=local \
- -e MANAGER_HOST=192.168.97.0 \
- -e MANAGER_PORT=1158 \
- -e MANAGER_PROTOCOL=grpc \
- -e MODE=public \
- --name hertzbeat-collector-go \
- hertzbeat-collector-go
-```
-
-#### 统一配置(推荐)
-
-采集器使用统一配置系统,支持文件和环境变量配置:
-
-- **文件 + 环境变量**:环境变量覆盖文件设置
-- **仅环境变量**:纯环境变量配置
-- **仅文件**:纯基于文件的配置
-
-配置优先级(从高到低):
-
-1. 环境变量
-2. 配置文件值
-3. 内置默认值
-
-#### 支持的环境变量
-
-| 环境变量 | 描述 | 默认值 |
-| ------------------ | -------------------------------- |
------------------------ |
-| `IDENTITY` | 采集器身份 | `hertzbeat-collector-go` |
-| `MODE` | 采集器模式(`public`/`private`) | `public` |
-| `COLLECTOR_NAME` | 采集器名称 | `hertzbeat-collector-go` |
-| `COLLECTOR_IP` | 采集器绑定 IP | `127.0.0.1` |
-| `COLLECTOR_PORT` | 采集器绑定端口 | `8080` |
-| `MANAGER_HOST` | 管理服务器主机 | `127.0.0.1` |
-| `MANAGER_PORT` | 管理服务器端口 | `1158` |
-| `MANAGER_PROTOCOL` | 协议(`netty`/`grpc`) | `netty`
|
-| `LOG_LEVEL` | 日志级别 | `info`
|
-
-### 3. 示例
-
-查看 `examples/` 目录获取各种使用示例:
-
-- `examples/main.go` - 使用环境变量的主要示例
-- `examples/README.md` - 完整使用指南
-- `examples/Dockerfile` - Docker 构建示例
-
-## 🔄 Java 服务器集成
-
-该 Go 采集器设计为与 Java 版本的 HertzBeat 管理服务器兼容。传输层支持 gRPC 和 Netty 协议,实现无缝集成。
-
-### 协议支持
-
-Go 采集器支持两种通信协议:
-
-1. **Netty 协议**(推荐用于 Java 服务器兼容性)
-
- - 使用长度前缀的 protobuf 消息格式
- - 与 Java Netty 服务器实现兼容
- - 默认端口:1158
-
-2. **gRPC 协议**
- - 使用标准 gRPC 和 protobuf
- - 支持双向流式通信
- - 默认端口:1159
-
-### 配置
-
-#### 基础配置
-
-采集器通过多种入口点支持灵活配置:
-
-```yaml
-# etc/hertzbeat-collector.yaml
-collector:
- info:
- name: hertzbeat-collector-go
- ip: 127.0.0.1
- port: 8080
-
- log:
- level: debug
-
- # 管理器/传输配置
- manager:
- host: 127.0.0.1
- port: 1158
- protocol: netty
-
- # 采集器身份和模式
- identity: hertzbeat-collector-go
- mode: public
-```
-
-#### 配置加载方法
-
-采集器提供三种配置加载方法:
-
-1. **仅文件配置**:
-
- ```go
- import
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
- cfg, err := config.LoadFromFile("etc/hertzbeat-collector.yaml")
- if err != nil {
- log.Fatal("配置加载失败:", err)
- }
- ```
-
-2. **仅环境变量配置**:
-
- ```go
- import
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
- cfg := config.LoadFromEnv()
- ```
-
-3. **统一配置(推荐)**:
-
- ```go
- import
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
- // 环境变量覆盖文件值
- cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml")
- if err != nil {
- log.Fatal("配置加载失败:", err)
- }
- ```
-
-#### 连接 Java 服务器
-
-```go
-package main
-
-import (
- "context"
- "log"
- "os"
- "os/signal"
- "syscall"
- "time"
-
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
-)
-
-func main() {
- // 使用统一加载器加载配置(文件 + 环境变量)
- cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml")
- if err != nil {
- log.Fatal("配置加载失败:", err)
- }
-
- // 使用统一配置创建传输运行器
- runner := transport.New(cfg)
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // 在后台启动传输
- go func() {
- if err := runner.Start(ctx); err != nil {
- log.Printf("启动传输失败: %v", err)
- cancel()
- }
- }()
-
- // 等待关闭信号
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- <-sigChan
-
- log.Println("正在关闭...")
-
- if err := runner.Close(); err != nil {
- log.Printf("关闭传输失败: %v", err)
- }
-}
-```
-
-### 直接客户端使用
-
-为了更细粒度的控制,您可以直接使用传输客户端:
-
-```go
-package main
-
-import (
- "log"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
- pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg"
-)
-
-func main() {
- // 创建 Java 服务器的 Netty 客户端
- factory := &transport.TransportClientFactory{}
- client, err := factory.CreateClient("netty", "127.0.0.1:1158")
- if err != nil {
- log.Fatal("创建客户端失败:", err)
- }
-
- // 启动客户端
- if err := client.Start(); err != nil {
- log.Fatal("启动客户端失败:", err)
- }
- defer client.Shutdown()
-
- // 注册消息处理器
- client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) {
- if pbMsg, ok := msg.(*pb.Message); ok {
- log.Printf("收到消息: %s", string(pbMsg.Msg))
- return &pb.Message{
- Type: pb.MessageType_HEARTBEAT,
- Direction: pb.Direction_RESPONSE,
- Identity: pbMsg.Identity,
- Msg: []byte("response"),
- }, nil
- }
- return nil, nil
- })
-
- // 发送心跳消息
- heartbeat := &pb.Message{
- Type: pb.MessageType_HEARTBEAT,
- Direction: pb.Direction_REQUEST,
- Identity: "go-collector",
- Msg: []byte("heartbeat"),
- }
-
- // 异步发送
- if err := client.SendMsg(heartbeat); err != nil {
- log.Printf("发送消息失败: %v", err)
- }
-
- // 同步发送,带超时
- resp, err := client.SendMsgSync(heartbeat, 5000)
- if err != nil {
- log.Printf("发送同步消息失败: %v", err)
- } else if resp != nil {
- if pbResp, ok := resp.(*pb.Message); ok {
- log.Printf("收到响应: %s", string(pbResp.Msg))
- }
- }
-}
-```
-
-### 消息类型
-
-Go 采集器支持 Java 版本中定义的所有消息类型:
-
-| 消息类型 | 值 | 描述 |
-| ------------------- | --- | ------------------ |
-| HEARTBEAT | 0 | 心跳/健康检查 |
-| GO_ONLINE | 1 | 采集器上线通知 |
-| GO_OFFLINE | 2 | 采集器下线通知 |
-| GO_CLOSE | 3 | 采集器关闭通知 |
-| ISSUE_CYCLIC_TASK | 4 | 发布周期性采集任务 |
-| DELETE_CYCLIC_TASK | 5 | 删除周期性采集任务 |
-| ISSUE_ONE_TIME_TASK | 6 | 发布一次性采集任务 |
-
-### 连接管理
-
-传输层提供了强大的连接管理功能:
-
-- **自动重连**:连接丢失时自动尝试重连
-- **连接监控**:后台监控连接健康状态
-- **心跳机制**:定期心跳消息以保持连接
-- **事件处理**:连接状态变更通知(已连接、断开连接、连接失败)
-
-### 错误处理
-
-该实现包含全面的错误处理:
-
-- **连接超时**:连接尝试的正确超时处理
-- **消息序列化**:Protobuf 编组/解组错误处理
-- **响应关联**:使用身份字段正确匹配请求和响应
-- **优雅关闭**:使用上下文取消的干净关闭程序
-
-## 🔍 代码逻辑分析和兼容性
-
-### 实现状态
-
-Go 采集器实现提供了与 Java 版本的全面兼容性:
-
-#### ✅ **完全实现的功能**
-
-1. **传输层兼容性**
-
- - **Netty 协议**:使用长度前缀消息格式的完整实现
- - **gRPC 协议**:完整的 gRPC 服务实现,支持双向流式通信
- - **消息类型**:支持所有核心消息类型(HEARTBEAT、GO_ONLINE、GO_OFFLINE 等)
- - **请求/响应模式**:正确处理同步和异步通信
-
-2. **连接管理**
-
- - **自动重连**:连接丢失时的强大重连逻辑
- - **连接监控**:带截止时间管理的后台健康检查
- - **事件系统**:连接状态变更的全面事件处理
- - **心跳机制**:用于连接维护的定期心跳消息
-
-3. **消息处理**
-
- - **处理器注册**:动态消息处理器注册和分发
- - **响应关联**:使用身份字段正确请求-响应匹配
- - **错误处理**:整个消息管道中的全面错误处理
- - **超时管理**:所有操作的可配置超时
-
-4. **协议兼容性**
- - **Protobuf 消息**:与 Java protobuf 定义完全兼容
- - **消息序列化**:Netty 协议的正确二进制格式处理
- - **流处理**:支持一元和流式 gRPC 操作
-
-#### ⚠️ **改进领域**
-
-1. **任务处理逻辑**
-
- - 当前实现为任务处理返回占位符响应
- - 需要根据具体要求实现实际采集逻辑
- - 任务调度和执行引擎需要集成
-
-2. **配置管理**
-
- - 配置文件格式需要与 Java 版本标准化
- - 环境变量支持可以增强
- - 可以添加动态配置重载
-
-3. **监控和指标**
- - 可以添加全面的指标收集
- - 可以增强性能监控集成
- - 可以暴露健康检查端点
-
-#### 🔧 **技术实现细节**
-
-1. **Netty 协议实现**
-
- ```go
- // Java 兼容的长度前缀消息格式
- func (c *NettyClient) writeMessage(msg *pb.Message) error {
- data, err := proto.Marshal(msg)
- if err != nil {
- return fmt.Errorf("消息编组失败: %w", err)
- }
- // 写入长度前缀(varint32)
- length := len(data)
- if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err
!= nil {
- return fmt.Errorf("写入长度失败: %w", err)
- }
- // 写入消息数据
- if _, err := c.writer.Write(data); err != nil {
- return fmt.Errorf("写入消息失败: %w", err)
- }
- return c.writer.Flush()
- }
- ```
-
-2. **响应未来模式**
-
- ```go
- // 使用 ResponseFuture 进行同步通信
- func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int)
(interface{}, error) {
- // 为此请求创建响应未来
- future := NewResponseFuture()
- c.responseTable[pbMsg.Identity] = future
- defer delete(c.responseTable, pbMsg.Identity)
-
- // 发送消息
- if err := c.writeMessage(pbMsg); err != nil {
- future.PutError(err)
- return nil, err
- }
-
- // 等待带超时的响应
- return future.Wait(time.Duration(timeoutMillis) * time.Millisecond)
- }
- ```
-
-3. **事件驱动架构**
-
- ```go
- // 连接事件处理
- func (c *NettyClient) triggerEvent(eventType EventType, err error) {
- if c.eventHandler != nil {
- c.eventHandler(Event{
- Type: eventType,
- Address: c.addr,
- Error: err,
- })
- }
- }
- ```
-
-#### 🎯 **兼容性评估**
-
-Go 实现实现了与 Java 版本的**高度兼容性**:
-
-- **协议级别**:100% 兼容 Netty 消息格式
-- **消息类型**:所有核心消息类型都已实现
-- **通信模式**:支持同步和异步模式
-- **连接管理**:具有自动恢复功能的强大连接处理
-- **错误处理**:全面的错误处理
-
-#### 📋 **建议**
-
-1. **生产使用**:
-
- - 根据具体监控要求实现实际任务处理逻辑
- - 添加全面的日志记录和监控
- - 实现配置验证和管理
- - 添加与 Java 服务器的集成测试
-
-2. **开发使用**:
-
- - 当前实现提供了坚实的基础
- - 所有核心通信模式都已正确实现
- - 协议兼容性得到了彻底解决
- - 可扩展性已构建到架构中
-
-3. **测试策略**:
- - 与实际 Java 服务器部署一起测试
- - 验证消息格式兼容性
- - 测试连接恢复场景
- - 验证负载下的性能
-
-Go 采集器实现成功地重新创建了 Java 版本的核心通信功能,为 Go 中的 HertzBeat 监控数据采集提供了坚实的基础。
-
-## 🛠️ 贡献
+## 贡献
欢迎贡献!请查看 [CONTRIBUTING.md](CONTRIBUTING.md) 了解详细信息,包括代码、文档、测试和讨论。
-## 📄 许可证
+## 许可证
本项目基于 [Apache 2.0 许可证](LICENSE) 许可。
-## 🌐 English Version
+## English Version
For English documentation, please see [README.md](README.md).
diff --git a/README.md b/README.md
index 240db01..43b3d5e 100644
--- a/README.md
+++ b/README.md
@@ -5,647 +5,20 @@
HertzBeat-Collector-Go is the Go implementation of the collector for [Apache
HertzBeat](https://github.com/apache/hertzbeat). It supports multi-protocol and
multi-type monitoring data collection, featuring high performance, easy
extensibility, and seamless integration.
-## ✨ Features
+## Quick Start
-- Supports various protocols (HTTP, JDBC, SNMP, SSH, etc.) for monitoring data
collection
-- Flexible and extensible task scheduling, job management, and collection
strategies
-- Clean architecture, easy for secondary development and integration
-- Rich development, testing, and deployment scripts
-- Comprehensive documentation and community support
-
-## 📂 Project Structure
-
-```text
-.
-├── cmd/ # Main entry point
-├── internal/ # Core collector implementation and common modules
-│ ├── collector/ # Various collectors
-│ ├── common/ # Common modules (scheduling, jobs, types, logging,
etc.)
-│ └── util/ # Utilities
-├── api/ # Protocol definitions (protobuf)
-├── examples/ # Example code
-├── docs/ # Architecture and development docs
-├── tools/ # Build, CI, scripts, and tools
-├── Makefile # Build entry
-└── README.md # Project description
-```
-
-## Configuration Architecture
-
-### Unified Configuration System
-
-The collector implements a unified configuration system with three main
components:
-
-#### 1. ConfigFactory
-
-Central configuration factory that provides:
-
-- Default values management
-- Environment variable processing
-- Configuration validation
-- Utility methods for configuration manipulation
-
-```go
-// Create configuration with defaults
-factory := config.NewConfigFactory()
-cfg := factory.CreateDefaultConfig()
-
-// Create from environment variables
-envCfg := factory.CreateFromEnv()
-
-// Merge file config with environment overrides
-mergedCfg := factory.MergeWithEnv(fileCfg)
-
-// Validate configuration
-if err := factory.ValidateConfig(cfg); err != nil {
- log.Fatal("Invalid configuration:", err)
-}
-```
-
-#### 2. Configuration Entry Points
-
-Three distinct entry points for different use cases:
-
-- **`config.LoadFromFile(path)`**: File-only configuration loading
-- **`config.LoadFromEnv()`**: Environment-only configuration loading
-- **`config.LoadUnified(path)`**: Combined file + environment loading
(recommended)
-
-#### 3. Configuration Structure
-
-```go
-type CollectorConfig struct {
- Collector CollectorSection `yaml:"collector"`
-}
-
-type CollectorSection struct {
- Info CollectorInfo `yaml:"info"`
- Log CollectorLogConfig `yaml:"log"`
- Manager ManagerConfig `yaml:"manager"`
- Identity string `yaml:"identity"`
- Mode string `yaml:"mode"`
-}
-
-type ManagerConfig struct {
- Host string `yaml:"host"`
- Port string `yaml:"port"`
- Protocol string `yaml:"protocol"`
-}
+```shell
+make run
```
-#### 4. Configuration Validation
-
-The system includes comprehensive validation:
-
-- **Required fields**: Identity, mode, manager host/port
-- **Value validation**: Port numbers, protocol types, mode values
-- **Format validation**: IP addresses, log levels
-
-#### 5. Default Values
-
-| Field | Default Value | Description |
-| ---------------- | ------------------------ | ---------------------- |
-| Identity | `hertzbeat-collector-go` | Collector identifier |
-| Mode | `public` | Collector mode |
-| Collector.Name | `hertzbeat-collector-go` | Collector service name |
-| Collector.IP | `127.0.0.1` | Collector bind address |
-| Collector.Port | `8080` | Collector service port |
-| Manager.Host | `127.0.0.1` | Manager server host |
-| Manager.Port | `1158` | Manager server port |
-| Manager.Protocol | `netty` | Communication protocol |
-| Log.Level | `info` | Logging level |
-
-### Migration from Legacy Configuration
-
-If you have existing configurations, here's how to migrate:
-
-#### Legacy Format (transport.yaml)
-
-```yaml
-server:
- host: "0.0.0.0"
- port: 1158
-transport:
- protocol: "netty"
- server_addr: "127.0.0.1:1158"
-```
-
-#### New Format (hertzbeat-collector.yaml)
-
-```yaml
-collector:
- info:
- name: hertzbeat-collector-go
- ip: 0.0.0.0
- port: 8080
- manager:
- host: 127.0.0.1
- port: 1158
- protocol: netty
- identity: hertzbeat-collector-go
- mode: public
-```
-
-## �🚀 Quick Start
-
-### 1. Build and Run
-
-```bash
-# Install dependencies
-go mod tidy
-
-# Build
-make build
-
-# Run
-./bin/collector server --config etc/hertzbeat-collector.yaml
-```
-
-### 2. Configuration Options
-
-The collector supports multiple configuration methods with a unified
configuration system:
-
-#### File-based Configuration
-
-```bash
-# Run with configuration file
-./bin/collector server --config etc/hertzbeat-collector.yaml
-```
-
-Example configuration file (`etc/hertzbeat-collector.yaml`):
-
-```yaml
-collector:
- info:
- name: hertzbeat-collector-go
- ip: 127.0.0.1
- port: 8080
-
- log:
- level: debug
-
- # Manager/Transport configuration
- manager:
- host: 127.0.0.1
- port: 1158
- protocol: netty
-
- # Collector identity and mode
- identity: hertzbeat-collector-go
- mode: public
-```
-
-#### Environment Variables (Docker Compatible)
-
-The Go version is fully compatible with the Java version's environment
variable configuration:
-
-```bash
-# Set environment variables
-export IDENTITY=local
-export COLLECTOR_NAME=hertzbeat-collector-go
-export COLLECTOR_IP=127.0.0.1
-export COLLECTOR_PORT=8080
-export MANAGER_HOST=192.168.97.0
-export MANAGER_PORT=1158
-export MANAGER_PROTOCOL=grpc
-export MODE=public
-export LOG_LEVEL=info
-
-# Run with environment variables
-./bin/collector server
-
-# Or use Docker
-docker run -d \
- -e IDENTITY=local \
- -e MANAGER_HOST=192.168.97.0 \
- -e MANAGER_PORT=1158 \
- -e MANAGER_PROTOCOL=grpc \
- -e MODE=public \
- --name hertzbeat-collector-go \
- hertzbeat-collector-go
-```
-
-#### Unified Configuration (Recommended)
-
-The collector uses a unified configuration system that supports both file and
environment variable configurations:
-
-- **File + Environment**: Environment variables override file settings
-- **Environment Only**: Pure environment variable configuration
-- **File Only**: Pure file-based configuration
-
-Configuration precedence (highest to lowest):
-
-1. Environment variables
-2. Configuration file values
-3. Built-in defaults
-
-#### Supported Environment Variables
-
-| Environment Variable | Description | Default Value
|
-| -------------------- | ----------------------------------- |
------------------------ |
-| `IDENTITY` | Collector identity |
`hertzbeat-collector-go` |
-| `MODE` | Collector mode (`public`/`private`) | `public`
|
-| `COLLECTOR_NAME` | Collector name |
`hertzbeat-collector-go` |
-| `COLLECTOR_IP` | Collector bind IP | `127.0.0.1`
|
-| `COLLECTOR_PORT` | Collector bind port | `8080`
|
-| `MANAGER_HOST` | Manager server host | `127.0.0.1`
|
-| `MANAGER_PORT` | Manager server port | `1158`
|
-| `MANAGER_PROTOCOL` | Protocol (`netty`/`grpc`) | `netty`
|
-| `LOG_LEVEL` | Log level | `info`
|
-
-### 3. Examples
-
-See `examples/` directory for various usage examples:
-
-- `examples/main.go` - Main example with environment variables
-- `examples/README.md` - Complete usage guide
-- `examples/Dockerfile` - Docker build example
-
-## 🔄 Java Server Integration
-
-This Go collector is designed to be compatible with the Java version of
HertzBeat manager server. The transport layer supports both gRPC and Netty
protocols for seamless integration.
-
-### Protocol Support
-
-The Go collector supports two communication protocols:
-
-1. **Netty Protocol** (Recommended for Java server compatibility)
-
- - Uses length-prefixed protobuf message format
- - Compatible with Java Netty server implementation
- - Default port: 1158
-
-2. **gRPC Protocol**
- - Uses standard gRPC with protobuf
- - Supports bidirectional streaming
- - Default port: 1159
-
-### Configuration
-
-#### Basic Configuration
-
-The collector supports flexible configuration through multiple entry points:
-
-```yaml
-# etc/hertzbeat-collector.yaml
-collector:
- info:
- name: hertzbeat-collector-go
- ip: 127.0.0.1
- port: 8080
-
- log:
- level: debug
-
- # Manager/Transport configuration
- manager:
- host: 127.0.0.1
- port: 1158
- protocol: netty
-
- # Collector identity and mode
- identity: hertzbeat-collector-go
- mode: public
-```
-
-#### Configuration Loading Methods
-
-The collector provides three configuration loading methods:
-
-1. **File-only Configuration**:
-
- ```go
- import
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
- cfg, err := config.LoadFromFile("etc/hertzbeat-collector.yaml")
- if err != nil {
- log.Fatal("Failed to load config:", err)
- }
- ```
-
-2. **Environment-only Configuration**:
-
- ```go
- import
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
- cfg := config.LoadFromEnv()
- ```
-
-3. **Unified Configuration (Recommended)**:
-
- ```go
- import
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
- // Environment variables override file values
- cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml")
- if err != nil {
- log.Fatal("Failed to load config:", err)
- }
- ```
-
-#### Connecting to Java Server
-
-```go
-package main
-
-import (
- "context"
- "log"
- "os"
- "os/signal"
- "syscall"
- "time"
-
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
-)
-
-func main() {
- // Load configuration using unified loader (file + env)
- cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml")
- if err != nil {
- log.Fatal("Failed to load configuration:", err)
- }
-
- // Create transport runner with unified config
- runner := transport.New(cfg)
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Start transport in background
- go func() {
- if err := runner.Start(ctx); err != nil {
- log.Printf("Failed to start transport: %v", err)
- cancel()
- }
- }()
-
- // Wait for shutdown signal
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
- <-sigChan
-
- log.Println("Shutting down...")
-
- if err := runner.Close(); err != nil {
- log.Printf("Failed to close transport: %v", err)
- }
-}
-```
-
-### Direct Client Usage
-
-For more granular control, you can use the transport client directly:
-
-```go
-package main
-
-import (
- "log"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
- pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg"
-)
-
-func main() {
- // Create Netty client for Java server
- factory := &transport.TransportClientFactory{}
- client, err := factory.CreateClient("netty", "127.0.0.1:1158")
- if err != nil {
- log.Fatal("Failed to create client:", err)
- }
-
- // Start client
- if err := client.Start(); err != nil {
- log.Fatal("Failed to start client:", err)
- }
- defer client.Shutdown()
-
- // Register message processor
- client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) {
- if pbMsg, ok := msg.(*pb.Message); ok {
- log.Printf("Received message: %s", string(pbMsg.Msg))
- return &pb.Message{
- Type: pb.MessageType_HEARTBEAT,
- Direction: pb.Direction_RESPONSE,
- Identity: pbMsg.Identity,
- Msg: []byte("response"),
- }, nil
- }
- return nil, nil
- })
-
- // Send heartbeat message
- heartbeat := &pb.Message{
- Type: pb.MessageType_HEARTBEAT,
- Direction: pb.Direction_REQUEST,
- Identity: "go-collector",
- Msg: []byte("heartbeat"),
- }
-
- // Async send
- if err := client.SendMsg(heartbeat); err != nil {
- log.Printf("Failed to send message: %v", err)
- }
-
- // Sync send with timeout
- resp, err := client.SendMsgSync(heartbeat, 5000)
- if err != nil {
- log.Printf("Failed to send sync message: %v", err)
- } else if resp != nil {
- if pbResp, ok := resp.(*pb.Message); ok {
- log.Printf("Received response: %s", string(pbResp.Msg))
- }
- }
-}
-```
-
-### Message Types
-
-The Go collector supports all message types defined in the Java version:
-
-| Message Type | Value | Description |
-| ------------------- | ----- | ------------------------------- |
-| HEARTBEAT | 0 | Heartbeat/health check |
-| GO_ONLINE | 1 | Collector online notification |
-| GO_OFFLINE | 2 | Collector offline notification |
-| GO_CLOSE | 3 | Collector shutdown notification |
-| ISSUE_CYCLIC_TASK | 4 | Issue cyclic collection task |
-| DELETE_CYCLIC_TASK | 5 | Delete cyclic collection task |
-| ISSUE_ONE_TIME_TASK | 6 | Issue one-time collection task |
-
-### Connection Management
-
-The transport layer provides robust connection management:
-
-- **Auto-reconnection**: Automatically attempts to reconnect when connection
is lost
-- **Connection monitoring**: Background monitoring of connection health
-- **Heartbeat mechanism**: Regular heartbeat messages to maintain connection
-- **Event handling**: Connection state change notifications (connected,
disconnected, connection failed)
-
-### Error Handling
-
-The implementation includes comprehensive error handling:
-
-- **Connection timeouts**: Proper timeout handling for connection attempts
-- **Message serialization**: Protobuf marshaling/unmarshaling error handling
-- **Response correlation**: Proper matching of requests and responses using
identity field
-- **Graceful shutdown**: Clean shutdown procedures with context cancellation
-
-## 🔍 Code Logic Analysis and Compatibility
-
-### Implementation Status
-
-The Go collector implementation provides comprehensive compatibility with the
Java version:
-
-#### ✅ **Fully Implemented Features**
-
-1. **Transport Layer Compatibility**
-
- - **Netty Protocol**: Complete implementation with length-prefixed message
format
- - **gRPC Protocol**: Full gRPC service implementation with bidirectional
streaming
- - **Message Types**: All core message types (HEARTBEAT, GO_ONLINE,
GO_OFFLINE, etc.) are supported
- - **Request/Response Pattern**: Proper handling of synchronous and
asynchronous communication
-
-2. **Connection Management**
-
- - **Auto-reconnection**: Robust reconnection logic when connection is lost
- - **Connection Monitoring**: Background health checks with deadline
management
- - **Event System**: Comprehensive event handling for connection state
changes
- - **Heartbeat Mechanism**: Regular heartbeat messages for connection
maintenance
-
-3. **Message Processing**
-
- - **Processor Registry**: Dynamic message processor registration and
dispatch
- - **Response Correlation**: Proper request-response matching using identity
field
- - **Error Handling**: Comprehensive error handling throughout the message
pipeline
- - **Timeout Management**: Configurable timeouts for all operations
-
-4. **Protocol Compatibility**
- - **Protobuf Messages**: Exact compatibility with Java protobuf definitions
- - **Message Serialization**: Proper binary format handling for Netty
protocol
- - **Stream Processing**: Support for both unary and streaming gRPC
operations
-
-#### ⚠️ **Areas for Improvement**
-
-1. **Task Processing Logic**
-
- - Current implementation returns placeholder responses for task processing
- - Actual collection logic needs to be implemented based on specific
requirements
- - Task scheduling and execution engine needs integration
-
-2. **Configuration Management**
-
- - Configuration file format needs to be standardized with Java version
- - Environment variable support could be enhanced
- - Dynamic configuration reloading could be added
-
-3. **Monitoring and Metrics**
- - Comprehensive metrics collection could be added
- - Performance monitoring integration could be enhanced
- - Health check endpoints could be exposed
-
-#### 🔧 **Technical Implementation Details**
-
-1. **Netty Protocol Implementation**
-
- ```go
- // Length-prefixed message format for Java compatibility
- func (c *NettyClient) writeMessage(msg *pb.Message) error {
- data, err := proto.Marshal(msg)
- if err != nil {
- return fmt.Errorf("failed to marshal message: %w", err)
- }
- // Write length prefix (varint32)
- length := len(data)
- if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err
!= nil {
- return fmt.Errorf("failed to write length: %w", err)
- }
- // Write message data
- if _, err := c.writer.Write(data); err != nil {
- return fmt.Errorf("failed to write message: %w", err)
- }
- return c.writer.Flush()
- }
- ```
-
-2. **Response Future Pattern**
-
- ```go
- // Synchronous communication using ResponseFuture
- func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int)
(interface{}, error) {
- // Create response future for this request
- future := NewResponseFuture()
- c.responseTable[pbMsg.Identity] = future
- defer delete(c.responseTable, pbMsg.Identity)
-
- // Send message
- if err := c.writeMessage(pbMsg); err != nil {
- future.PutError(err)
- return nil, err
- }
-
- // Wait for response with timeout
- return future.Wait(time.Duration(timeoutMillis) * time.Millisecond)
- }
- ```
-
-3. **Event-Driven Architecture**
-
- ```go
- // Connection event handling
- func (c *NettyClient) triggerEvent(eventType EventType, err error) {
- if c.eventHandler != nil {
- c.eventHandler(Event{
- Type: eventType,
- Address: c.addr,
- Error: err,
- })
- }
- }
- ```
-
-#### 🎯 **Compatibility Assessment**
-
-The Go implementation achieves **high compatibility** with the Java version:
-
-- **Protocol Level**: 100% compatible with Netty message format
-- **Message Types**: All core message types implemented
-- **Communication Patterns**: Both sync and async patterns supported
-- **Connection Management**: Robust connection handling with auto-recovery
-- **Error Handling**: Comprehensive error handling throughout
-
-#### 📋 **Recommendations**
-
-1. **For Production Use**:
-
- - Implement actual task processing logic based on specific monitoring
requirements
- - Add comprehensive logging and monitoring
- - Implement configuration validation and management
- - Add integration tests with Java server
-
-2. **For Development**:
-
- - The current implementation provides a solid foundation
- - All core communication patterns are correctly implemented
- - Protocol compatibility is thoroughly addressed
- - Extensibility is built into the architecture
-
-3. **Testing Strategy**:
- - Test with actual Java server deployment
- - Verify message format compatibility
- - Test connection recovery scenarios
- - Validate performance under load
-
-The Go collector implementation successfully recreates the core communication
capabilities of the Java version, providing a solid foundation for HertzBeat
monitoring data collection in Go.
-
-## 🛠️ Contributing
+## Contributing
Contributions are welcome! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for
details, including code, documentation, tests, and discussions.
-## 📄 License
+## License
This project is licensed under the [Apache 2.0 License](LICENSE).
-## 🌐 中文版本
+## 中文版本
For Chinese documentation, please see [README-CN.md](README-CN.md).
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]