This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 652f928 feat: optimize cfg and binary start (#36)
652f928 is described below
commit 652f928e339a6c39099f55bdd9cd2036c94a7c22
Author: shown <[email protected]>
AuthorDate: Wed Dec 17 17:47:19 2025 +0800
feat: optimize cfg and binary start (#36)
---
etc/hertzbeat-collector.yaml | 2 +-
go.mod | 1 +
go.sum | 11 +-
internal/config/config.go | 278 +++++++++++++++++++++-----
internal/config/config_factory.go | 353 ----------------------------------
internal/config/env_config.go | 84 --------
internal/config/unified_config.go | 114 -----------
internal/constants/const.go | 80 ++++----
internal/transport/netty_client.go | 27 +--
internal/transport/processors.go | 8 +-
internal/transport/transport.go | 166 ++++++++--------
internal/types/config/config_types.go | 15 +-
internal/util/logger/logger.go | 1 +
13 files changed, 395 insertions(+), 745 deletions(-)
diff --git a/etc/hertzbeat-collector.yaml b/etc/hertzbeat-collector.yaml
index e67a1cd..5e87865 100644
--- a/etc/hertzbeat-collector.yaml
+++ b/etc/hertzbeat-collector.yaml
@@ -22,7 +22,7 @@ collector:
port: 8080
log:
- level: debug
+ level: info
# Transport/Manager configuration
manager:
diff --git a/go.mod b/go.mod
index df2342e..d840a0e 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
github.com/alicebob/miniredis/v2 v2.35.0
github.com/apache/arrow/go/v13 v13.0.0
github.com/expr-lang/expr v1.16.9
+ github.com/fsnotify/fsnotify v1.4.7
github.com/go-logr/logr v1.4.3
github.com/go-logr/zapr v1.3.0
github.com/go-sql-driver/mysql v1.9.3
diff --git a/go.sum b/go.sum
index 5ba8df6..d52801f 100644
--- a/go.sum
+++ b/go.sum
@@ -28,11 +28,11 @@ github.com/armon/consul-api
v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/aymerick/raymond
v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod
h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
-github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/bsm/ginkgo/v2 v2.12.0
h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod
h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod
h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod
h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@@ -58,6 +58,8 @@ github.com/davecgh/go-spew
v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0/go.mod
h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod
h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.0/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod
h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -66,12 +68,11 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod
h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane
v0.9.9-0.20210217033140-668b12f5399d/go.mod
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/etcd-io/bbolt v1.3.3/go.mod
h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
-github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
-github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/expr-lang/expr v1.16.9
h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=
github.com/expr-lang/expr v1.16.9/go.mod
h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/fasthttp-contrib/websocket
v0.0.0-20160511215533-1f3b11f56072/go.mod
h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
github.com/fatih/structs v1.1.0/go.mod
h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
+github.com/fsnotify/fsnotify v1.4.7
h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gavv/httpexpect v2.0.0+incompatible/go.mod
h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
github.com/getsentry/sentry-go v0.12.0
h1:era7g0re5iY13bHSdN/xMkyV+5zZppjRVQhZrXCaEIk=
@@ -256,11 +257,11 @@ github.com/prometheus/common v0.65.0
h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2
github.com/prometheus/common v0.65.0/go.mod
h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
github.com/prometheus/procfs v0.16.1
h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod
h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
+github.com/redis/go-redis/v9 v9.17.1
h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
+github.com/redis/go-redis/v9 v9.17.1/go.mod
h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/rogpeppe/go-internal v1.6.1/go.mod
h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.1/go.mod
h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
github.com/rogpeppe/go-internal v1.9.0/go.mod
h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
-github.com/redis/go-redis/v9 v9.17.1
h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
-github.com/redis/go-redis/v9 v9.17.1/go.mod
h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/rogpeppe/go-internal v1.13.1
h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod
h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/russross/blackfriday v1.5.2/go.mod
h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
diff --git a/internal/config/config.go b/internal/config/config.go
index a5a299f..0770e44 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -18,110 +18,290 @@
package config
import (
+ "context"
"errors"
+ "fmt"
"os"
+ "path/filepath"
+ "sync"
+ "sync/atomic"
+ "time"
+ "github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ cfgtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
collectortypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
-const (
- DefaultHertzBeatCollectorName = "hertzbeat-collector"
+var (
+ globalConfig atomic.Pointer[cfgtypes.CollectorConfig]
+ configMu sync.RWMutex
)
-// Loader handles file-based configuration loading
-// It uses the common ConfigFactory for consistency
+// Loader handles file-based configuration loading with hot-reload support
type Loader struct {
cfgPath string
- factory *ConfigFactory
logger logger.Logger
}
+// New creates a new configuration loader
func New(cfgPath string) *Loader {
+
return &Loader{
cfgPath: cfgPath,
- factory: NewConfigFactory(),
+ logger: logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo).WithName("config-loader"),
}
}
-func (l *Loader) LoadConfig() (*config.CollectorConfig, error) {
- l.logger = logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo).WithName("config-loader")
-
+// LoadConfig loads configuration from file
+func (l *Loader) LoadConfig() (*cfgtypes.CollectorConfig, error) {
if l.cfgPath == "" {
- l.logger.Info("file-config-loader: path is empty, using
defaults")
- return l.factory.CreateDefaultConfig(), nil
+ err := errors.New("config path is required")
+ l.logger.Error(err, "config path is empty")
+ return nil, err
}
- if _, err := os.Stat(l.cfgPath); os.IsNotExist(err) {
- l.logger.Error(err, "file-config-loader: file not exist",
"path", l.cfgPath)
+ cfg, err := l.parseConfigFile(l.cfgPath)
+ if err != nil {
return nil, err
}
- file, err := os.Open(l.cfgPath)
+ l.logger.Info("configuration loaded successfully", "path", l.cfgPath)
+ return cfg, nil
+}
+
+// parseConfigFile parses the YAML config file
+func (l *Loader) parseConfigFile(path string) (*cfgtypes.CollectorConfig,
error) {
+ // Resolve symlinks to handle Kubernetes ConfigMap mounts
+ resolved, err := filepath.EvalSymlinks(path)
if err != nil {
+ resolved = path
+ }
+
+ if _, err := os.Stat(resolved); os.IsNotExist(err) {
+ l.logger.Error(err, "config file not exist", "path", resolved)
+ return nil, err
+ }
+
+ data, err := os.ReadFile(resolved)
+ if err != nil {
+ l.logger.Error(err, "failed to read config file", "path",
resolved)
return nil, err
}
- defer func(file *os.File) {
- err := file.Close()
- if err != nil {
- l.logger.Error(err, "close config file failed")
- }
- }(file)
- var cfg config.CollectorConfig
- decoder := yaml.NewDecoder(file)
- if err := decoder.Decode(&cfg); err != nil {
- l.logger.Error(err, "decode config file failed")
+ var cfg cfgtypes.CollectorConfig
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ l.logger.Error(err, "failed to parse config file", "path",
resolved)
return nil, err
}
- // Fill in any missing fields with defaults
- filledCfg := l.factory.MergeWithEnv(&cfg)
+ // Apply default values
+ // when some fields are missing in config file
+ l.applyDefaults(&cfg)
- l.logger.Info("Configuration loaded from file", "path", l.cfgPath)
- return filledCfg, nil
+ return &cfg, nil
}
-func (l *Loader) ValidateConfig(cfg *config.CollectorConfig) error {
- if l.factory != nil {
- return l.factory.ValidateConfig(cfg)
+// applyDefaults fills in missing configuration with defaults
+func (l *Loader) applyDefaults(cfg *cfgtypes.CollectorConfig) {
+ if cfg.Collector.Info.Name == "" {
+ cfg.Collector.Info.Name =
constants.DefaultHertzBeatCollectorName
}
+ if cfg.Collector.Log.Level == "" {
+ cfg.Collector.Log.Level = string(loggertypes.LogLevelInfo)
+ }
+ if cfg.Collector.Manager.Protocol == "" {
+ cfg.Collector.Manager.Protocol = "http"
+ }
+}
- // Fallback validation if factory is not available
+// ValidateConfig validates the configuration
+func (l *Loader) ValidateConfig(cfg *cfgtypes.CollectorConfig) error {
if cfg == nil {
- l.logger.Error(collectortypes.CollectorConfigIsNil,
"config-loader is nil")
- return errors.New("config-loader is nil")
+ err := errors.New("config is nil")
+ l.logger.Error(collectortypes.CollectorConfigIsNil, "config
validation failed")
+ return err
}
- // other check
if cfg.Collector.Info.IP == "" {
- l.logger.Error(collectortypes.CollectorIPIsNil, "collector ip
is empty")
- return errors.New("config-loader ip is empty")
+ err := errors.New("collector ip is empty")
+ l.logger.Error(collectortypes.CollectorIPIsNil, "config
validation failed")
+ return err
}
if cfg.Collector.Info.Port == "" {
- l.logger.Error(collectortypes.CollectorPortIsNil,
"config-loader: port is empty")
- return errors.New("config-loader port is empty")
+ err := errors.New("collector port is empty")
+ l.logger.Error(collectortypes.CollectorPortIsNil, "config
validation failed")
+ return err
}
if cfg.Collector.Info.Name == "" {
- l.logger.Sugar().Debug("config-loader: name is empty")
- cfg.Collector.Info.Name = DefaultHertzBeatCollectorName
+ cfg.Collector.Info.Name =
constants.DefaultHertzBeatCollectorName
+ l.logger.Sugar().Debug("collector name is empty, using default")
}
return nil
}
-// GetManagerAddress returns the full manager address using the factory
-func (l *Loader) GetManagerAddress(cfg *config.CollectorConfig) string {
- return l.factory.GetManagerAddress(cfg)
+// GetManagerAddress returns the full manager address
+func (l *Loader) GetManagerAddress(cfg *cfgtypes.CollectorConfig) string {
+ if cfg == nil || cfg.Collector.Manager.Host == "" {
+ return ""
+ }
+ protocol := cfg.Collector.Manager.Protocol
+ if protocol == "" {
+ protocol = "http"
+ }
+ return fmt.Sprintf("%s://%s:%s", protocol, cfg.Collector.Manager.Host,
cfg.Collector.Manager.Port)
+}
+
+// PrintConfig prints the configuration
+func (l *Loader) PrintConfig(cfg *cfgtypes.CollectorConfig) {
+ if cfg == nil {
+ l.logger.Info("config is nil")
+ return
+ }
+ l.logger.Info("current configuration",
+ "name", cfg.Collector.Info.Name,
+ "ip", cfg.Collector.Info.IP,
+ "port", cfg.Collector.Info.Port,
+ "log_level", cfg.Collector.Log.Level,
+ "manager", l.GetManagerAddress(cfg),
+ )
+}
+
+// GetGlobalConfig returns the current global configuration
+func GetGlobalConfig() *cfgtypes.CollectorConfig {
+ return globalConfig.Load()
+}
+
+// SetGlobalConfig sets the global configuration
+func SetGlobalConfig(cfg *cfgtypes.CollectorConfig) {
+ configMu.Lock()
+ defer configMu.Unlock()
+ globalConfig.Store(cfg)
+}
+
+// WatchConfigAndReload watches the config file and reloads on changes
+// This function implements hot-reload for both configuration and logging
+func (l *Loader) WatchConfigAndReload(ctx context.Context) error {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ l.logger.Error(err, "failed to create config watcher")
+ return err
+ }
+ defer watcher.Close()
+
+ // Watch both the file and its directory to handle symlink swaps
(Kubernetes ConfigMap)
+ cfgFile := l.cfgPath
+ cfgDir := filepath.Dir(cfgFile)
+
+ if err := watcher.Add(cfgDir); err != nil {
+ l.logger.Error(err, "failed to watch config directory", "dir",
cfgDir)
+ return err
+ }
+
+ // Try to watch the file directly (best-effort)
+ _ = watcher.Add(cfgFile)
+
+ l.logger.Info("config file watcher started", "path", cfgFile)
+
+ // Debounce events
+ var (
+ pending bool
+ last time.Time
+ )
+
+ reload := func() {
+ l.logger.Info("config file changed, reloading...")
+
+ // Parse new configuration
+ newCfg, err := l.parseConfigFile(cfgFile)
+ if err != nil {
+ l.logger.Error(err, "failed to reload config")
+ return
+ }
+
+ // Validate new configuration
+ if err := l.ValidateConfig(newCfg); err != nil {
+ l.logger.Error(err, "invalid config after reload")
+ return
+ }
+
+ // Update global configuration
+ SetGlobalConfig(newCfg)
+
+ // Hot-reload logging configuration
+ if err := l.reloadLogging(newCfg); err != nil {
+ l.logger.Error(err, "failed to reload logging")
+ }
+
+ l.logger.Info("configuration reloaded successfully")
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ l.logger.Info("config watcher stopped")
+ return ctx.Err()
+
+ case event, ok := <-watcher.Events:
+ if !ok {
+ return nil
+ }
+
+ // Handle Write, Create, Remove, Rename, Chmod events
+ if
event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename|fsnotify.Chmod)
!= 0 {
+ // Check if the event pertains to the config
file or directory
+ if filepath.Base(event.Name) ==
filepath.Base(cfgFile) || filepath.Dir(event.Name) == cfgDir {
+ // Debounce: if not pending or enough
time has passed
+ if !pending || time.Since(last) >
250*time.Millisecond {
+ pending = true
+ last = time.Now()
+ // Slight delay to let file
settle
+ go func() {
+ time.Sleep(300 *
time.Millisecond)
+ reload()
+ pending = false
+ }()
+ }
+ }
+ }
+
+ case err, ok := <-watcher.Errors:
+ if !ok {
+ return nil
+ }
+ l.logger.Error(err, "config watcher error")
+ }
+ }
}
-// PrintConfig prints the configuration using the factory
-func (l *Loader) PrintConfig(cfg *config.CollectorConfig) {
- l.factory.PrintConfig(cfg)
+// reloadLogging reloads the logging configuration
+// when config file changes, it helps dynamically
+// adjust the log level for dynamic debugging
+func (l *Loader) reloadLogging(cfg *cfgtypes.CollectorConfig) error {
+
+ if cfg == nil {
+ return errors.New("config is nil")
+ }
+
+ // Parse log level from config
+ level := loggertypes.LogLevel(cfg.Collector.Log.Level)
+ if level == "" {
+ level = loggertypes.LogLevelInfo
+ }
+
+ // Create new logger with updated level
+ newLogger := logger.DefaultLogger(os.Stdout,
level).WithName("config-loader")
+
+ // Update loader's logger
+ l.logger = newLogger
+
+ l.logger.Info("logging configuration reloaded", "level", level)
+ return nil
}
diff --git a/internal/config/config_factory.go
b/internal/config/config_factory.go
deleted file mode 100644
index 7854d4f..0000000
--- a/internal/config/config_factory.go
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package config
-
-import (
- "fmt"
- "os"
- "strconv"
- "strings"
-
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-// ConfigDefaults contains all default configuration values
-type ConfigDefaults struct {
- Identity string
- Mode string
- CollectorName string
- CollectorIP string
- CollectorPort string
- ManagerHost string
- ManagerPort string
- ManagerProtocol string
- LogLevel string
-}
-
-// GetDefaultConfig returns the default configuration values
-func GetDefaultConfig() *ConfigDefaults {
- return &ConfigDefaults{
- Identity: "hertzbeat-collector-go",
- Mode: "public",
- CollectorName: "hertzbeat-collector-go",
- CollectorIP: "127.0.0.1",
- CollectorPort: "8080",
- ManagerHost: "127.0.0.1",
- ManagerPort: "1158",
- ManagerProtocol: "netty",
- LogLevel: "info",
- }
-}
-
-// ConfigFactory provides factory methods for creating configurations
-type ConfigFactory struct {
- defaults *ConfigDefaults
- logger logger.Logger
-}
-
-// NewConfigFactory creates a new configuration factory
-func NewConfigFactory() *ConfigFactory {
- return &ConfigFactory{
- defaults: GetDefaultConfig(),
- logger: logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo).WithName("config-factory"),
- }
-}
-
-// CreateDefaultConfig creates a configuration with default values
-func (f *ConfigFactory) CreateDefaultConfig() *config.CollectorConfig {
- return &config.CollectorConfig{
- Collector: config.CollectorSection{
- Info: config.CollectorInfo{
- Name: f.defaults.CollectorName,
- IP: f.defaults.CollectorIP,
- Port: f.defaults.CollectorPort,
- },
- Log: config.CollectorLogConfig{
- Level: f.defaults.LogLevel,
- },
- Manager: config.ManagerConfig{
- Host: f.defaults.ManagerHost,
- Port: f.defaults.ManagerPort,
- Protocol: f.defaults.ManagerProtocol,
- },
- Identity: f.defaults.Identity,
- Mode: f.defaults.Mode,
- },
- }
-}
-
-// CreateFromEnv creates configuration from environment variables with
fallback to defaults
-func (f *ConfigFactory) CreateFromEnv() *config.CollectorConfig {
- cfg := f.CreateDefaultConfig()
-
- // Override with environment variables if they exist
- if identity := os.Getenv("IDENTITY"); identity != "" {
- cfg.Collector.Identity = identity
- }
-
- if mode := os.Getenv("MODE"); mode != "" {
- cfg.Collector.Mode = mode
- }
-
- if name := os.Getenv("COLLECTOR_NAME"); name != "" {
- cfg.Collector.Info.Name = name
- }
-
- if ip := os.Getenv("COLLECTOR_IP"); ip != "" {
- cfg.Collector.Info.IP = ip
- }
-
- if port := os.Getenv("COLLECTOR_PORT"); port != "" {
- cfg.Collector.Info.Port = port
- }
-
- if host := os.Getenv("MANAGER_HOST"); host != "" {
- cfg.Collector.Manager.Host = host
- }
-
- if port := os.Getenv("MANAGER_PORT"); port != "" {
- cfg.Collector.Manager.Port = port
- }
-
- if protocol := os.Getenv("MANAGER_PROTOCOL"); protocol != "" {
- cfg.Collector.Manager.Protocol = protocol
- }
-
- if level := os.Getenv("LOG_LEVEL"); level != "" {
- cfg.Collector.Log.Level = level
- }
-
- f.logger.Info("Configuration created from environment variables",
- "identity", cfg.Collector.Identity,
- "mode", cfg.Collector.Mode,
- "manager", fmt.Sprintf("%s:%s", cfg.Collector.Manager.Host,
cfg.Collector.Manager.Port))
-
- return cfg
-}
-
-// MergeWithEnv merges file configuration with environment variable overrides
-func (f *ConfigFactory) MergeWithEnv(fileCfg *config.CollectorConfig)
*config.CollectorConfig {
- if fileCfg == nil {
- return f.CreateFromEnv()
- }
-
- // Start with file configuration
- cfg := *fileCfg
-
- // Override with environment variables if they exist
- if identity := os.Getenv("IDENTITY"); identity != "" {
- cfg.Collector.Identity = identity
- }
-
- if mode := os.Getenv("MODE"); mode != "" {
- cfg.Collector.Mode = mode
- }
-
- if name := os.Getenv("COLLECTOR_NAME"); name != "" {
- cfg.Collector.Info.Name = name
- }
-
- if ip := os.Getenv("COLLECTOR_IP"); ip != "" {
- cfg.Collector.Info.IP = ip
- }
-
- if port := os.Getenv("COLLECTOR_PORT"); port != "" {
- cfg.Collector.Info.Port = port
- }
-
- if host := os.Getenv("MANAGER_HOST"); host != "" {
- cfg.Collector.Manager.Host = host
- }
-
- if port := os.Getenv("MANAGER_PORT"); port != "" {
- cfg.Collector.Manager.Port = port
- }
-
- if protocol := os.Getenv("MANAGER_PROTOCOL"); protocol != "" {
- cfg.Collector.Manager.Protocol = protocol
- }
-
- if level := os.Getenv("LOG_LEVEL"); level != "" {
- cfg.Collector.Log.Level = level
- }
-
- // Fill in defaults for any empty fields
- f.fillDefaults(&cfg)
-
- f.logger.Info("Configuration merged with environment variables")
- return &cfg
-}
-
-// fillDefaults fills in default values for any empty fields
-func (f *ConfigFactory) fillDefaults(cfg *config.CollectorConfig) {
- if cfg.Collector.Identity == "" {
- cfg.Collector.Identity = f.defaults.Identity
- }
-
- if cfg.Collector.Mode == "" {
- cfg.Collector.Mode = f.defaults.Mode
- }
-
- if cfg.Collector.Info.Name == "" {
- cfg.Collector.Info.Name = f.defaults.CollectorName
- }
-
- if cfg.Collector.Info.IP == "" {
- cfg.Collector.Info.IP = f.defaults.CollectorIP
- }
-
- if cfg.Collector.Info.Port == "" {
- cfg.Collector.Info.Port = f.defaults.CollectorPort
- }
-
- if cfg.Collector.Manager.Host == "" {
- cfg.Collector.Manager.Host = f.defaults.ManagerHost
- }
-
- if cfg.Collector.Manager.Port == "" {
- cfg.Collector.Manager.Port = f.defaults.ManagerPort
- }
-
- if cfg.Collector.Manager.Protocol == "" {
- cfg.Collector.Manager.Protocol = f.defaults.ManagerProtocol
- }
-
- if cfg.Collector.Log.Level == "" {
- cfg.Collector.Log.Level = f.defaults.LogLevel
- }
-}
-
-// ValidateConfig validates the configuration
-func (f *ConfigFactory) ValidateConfig(cfg *config.CollectorConfig) error {
- if cfg == nil {
- return fmt.Errorf("configuration is nil")
- }
-
- // Validate required fields
- if cfg.Collector.Identity == "" {
- return fmt.Errorf("collector identity is required")
- }
-
- if cfg.Collector.Mode == "" {
- return fmt.Errorf("collector mode is required")
- }
-
- if cfg.Collector.Manager.Host == "" {
- return fmt.Errorf("manager host is required")
- }
-
- if cfg.Collector.Manager.Port == "" {
- return fmt.Errorf("manager port is required")
- }
-
- // Validate mode
- validModes := map[string]bool{
- "public": true,
- "private": true,
- }
- if !validModes[strings.ToLower(cfg.Collector.Mode)] {
- return fmt.Errorf("invalid mode: %s, must be 'public' or
'private'", cfg.Collector.Mode)
- }
-
- // Validate protocol
- validProtocols := map[string]bool{
- "netty": true,
- "grpc": true,
- }
- if cfg.Collector.Manager.Protocol != "" &&
!validProtocols[strings.ToLower(cfg.Collector.Manager.Protocol)] {
- return fmt.Errorf("invalid protocol: %s, must be 'netty' or
'grpc'", cfg.Collector.Manager.Protocol)
- }
-
- // Validate port numbers
- if _, err := strconv.Atoi(cfg.Collector.Info.Port); err != nil {
- return fmt.Errorf("invalid collector port: %s",
cfg.Collector.Info.Port)
- }
-
- if _, err := strconv.Atoi(cfg.Collector.Manager.Port); err != nil {
- return fmt.Errorf("invalid manager port: %s",
cfg.Collector.Manager.Port)
- }
-
- f.logger.Info("Configuration validation passed")
- return nil
-}
-
-// GetManagerAddress returns the full manager address (host:port)
-func (f *ConfigFactory) GetManagerAddress(cfg *config.CollectorConfig) string {
- if cfg == nil || cfg.Collector.Manager.Host == "" ||
cfg.Collector.Manager.Port == "" {
- return fmt.Sprintf("%s:%s", f.defaults.ManagerHost,
f.defaults.ManagerPort)
- }
- return fmt.Sprintf("%s:%s", cfg.Collector.Manager.Host,
cfg.Collector.Manager.Port)
-}
-
-// IsPublicMode checks if the collector is running in public mode
-func (f *ConfigFactory) IsPublicMode(cfg *config.CollectorConfig) bool {
- if cfg == nil {
- return strings.ToLower(f.defaults.Mode) == "public"
- }
- return strings.ToLower(cfg.Collector.Mode) == "public"
-}
-
-// IsPrivateMode checks if the collector is running in private mode
-func (f *ConfigFactory) IsPrivateMode(cfg *config.CollectorConfig) bool {
- if cfg == nil {
- return strings.ToLower(f.defaults.Mode) == "private"
- }
- return strings.ToLower(cfg.Collector.Mode) == "private"
-}
-
-// PrintConfig prints the configuration in a readable format
-func (f *ConfigFactory) PrintConfig(cfg *config.CollectorConfig) {
- if cfg == nil {
- f.logger.Error(fmt.Errorf("configuration is nil"),
"Configuration is nil")
- return
- }
-
- f.logger.Info("=== Collector Configuration ===")
- f.logger.Info("Identity", "value", cfg.Collector.Identity)
- f.logger.Info("Mode", "value", cfg.Collector.Mode)
- f.logger.Info("Collector Name", "value", cfg.Collector.Info.Name)
- f.logger.Info("Collector Address", "value", fmt.Sprintf("%s:%s",
cfg.Collector.Info.IP, cfg.Collector.Info.Port))
- f.logger.Info("Manager Address", "value", fmt.Sprintf("%s:%s",
cfg.Collector.Manager.Host, cfg.Collector.Manager.Port))
- f.logger.Info("Manager Protocol", "value",
cfg.Collector.Manager.Protocol)
- f.logger.Info("Log Level", "value", cfg.Collector.Log.Level)
- f.logger.Info("===============================")
-}
-
-// === Configuration Entry Points ===
-
-// LoadFromFile loads configuration from file only
-func LoadFromFile(cfgPath string) (*config.CollectorConfig, error) {
- loader := New(cfgPath)
- return loader.LoadConfig()
-}
-
-// LoadFromEnv loads configuration from environment variables only
-func LoadFromEnv() *config.CollectorConfig {
- envLoader := NewEnvConfigLoader()
- return envLoader.LoadFromEnv()
-}
-
-// LoadUnified loads configuration with file + env priority (recommended)
-func LoadUnified(cfgPath string) (*config.CollectorConfig, error) {
- unifiedLoader := NewUnifiedConfigLoader(cfgPath)
- return unifiedLoader.Load()
-}
diff --git a/internal/config/env_config.go b/internal/config/env_config.go
deleted file mode 100644
index 40c09c1..0000000
--- a/internal/config/env_config.go
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-import (
- "os"
-
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-// EnvConfigLoader handles environment variable configuration
-// It uses the common ConfigFactory for consistency
-type EnvConfigLoader struct {
- factory *ConfigFactory
- logger logger.Logger
-}
-
-// NewEnvConfigLoader creates a new environment variable configuration loader
-func NewEnvConfigLoader() *EnvConfigLoader {
- return &EnvConfigLoader{
- factory: NewConfigFactory(),
- logger: logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo).WithName("env-config-loader"),
- }
-}
-
-// LoadFromEnv loads configuration from environment variables with defaults
-func (l *EnvConfigLoader) LoadFromEnv() *config.CollectorConfig {
- cfg := l.factory.CreateFromEnv()
- l.logger.Info("Configuration loaded from environment variables")
- return cfg
-}
-
-// LoadWithDefaults loads configuration with environment variables overriding
provided defaults
-func (l *EnvConfigLoader) LoadWithDefaults(defaultCfg *config.CollectorConfig)
*config.CollectorConfig {
- if defaultCfg == nil {
- return l.LoadFromEnv()
- }
-
- cfg := l.factory.MergeWithEnv(defaultCfg)
- l.logger.Info("Configuration loaded with environment variable
overrides")
- return cfg
-}
-
-// ValidateEnvConfig validates the environment configuration
-func (l *EnvConfigLoader) ValidateEnvConfig(cfg *config.CollectorConfig) error
{
- return l.factory.ValidateConfig(cfg)
-}
-
-// PrintEnvConfig prints the current environment configuration
-func (l *EnvConfigLoader) PrintEnvConfig(cfg *config.CollectorConfig) {
- l.factory.PrintConfig(cfg)
-}
-
-// GetManagerAddress returns the full manager address (host:port)
-func (l *EnvConfigLoader) GetManagerAddress(cfg *config.CollectorConfig)
string {
- return l.factory.GetManagerAddress(cfg)
-}
-
-// IsPublicMode checks if the collector is running in public mode
-func (l *EnvConfigLoader) IsPublicMode(cfg *config.CollectorConfig) bool {
- return l.factory.IsPublicMode(cfg)
-}
-
-// IsPrivateMode checks if the collector is running in private mode
-func (l *EnvConfigLoader) IsPrivateMode(cfg *config.CollectorConfig) bool {
- return l.factory.IsPrivateMode(cfg)
-}
diff --git a/internal/config/unified_config.go
b/internal/config/unified_config.go
deleted file mode 100644
index 2640448..0000000
--- a/internal/config/unified_config.go
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-import (
- "os"
-
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-// UnifiedConfigLoader provides a unified configuration loading interface
-// that loads from file first, then overrides with environment variables
-// It uses the common ConfigFactory for consistency
-type UnifiedConfigLoader struct {
- fileLoader *Loader
- envLoader *EnvConfigLoader
- factory *ConfigFactory
- logger logger.Logger
-}
-
-// NewUnifiedConfigLoader creates a new unified configuration loader
-func NewUnifiedConfigLoader(cfgPath string) *UnifiedConfigLoader {
- return &UnifiedConfigLoader{
- fileLoader: New(cfgPath),
- envLoader: NewEnvConfigLoader(),
- factory: NewConfigFactory(),
- logger: logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo).WithName("unified-config-loader"),
- }
-}
-
-// Load loads configuration from file and environment variables
-// Environment variables take precedence over file configuration
-func (l *UnifiedConfigLoader) Load() (*config.CollectorConfig, error) {
- var cfg *config.CollectorConfig
- var err error
-
- // Try to load from file first
- if l.fileLoader.cfgPath != "" {
- if cfg, err = l.fileLoader.LoadConfig(); err != nil {
- l.logger.Error(err, "Failed to load configuration from
file, falling back to environment variables only")
- cfg = nil
- } else {
- l.logger.Info("Configuration loaded from file", "file",
l.fileLoader.cfgPath)
- }
- }
-
- // If file loading failed or no file specified, start with defaults
- if cfg == nil {
- cfg = l.factory.CreateDefaultConfig()
- l.logger.Info("Using default configuration")
- }
-
- // Merge with environment variables (env takes precedence)
- finalCfg := l.factory.MergeWithEnv(cfg)
-
- // Validate the final configuration
- if err := l.factory.ValidateConfig(finalCfg); err != nil {
- l.logger.Error(err, "Configuration validation failed")
- return nil, err
- }
-
- l.logger.Info("Unified configuration loaded successfully")
- return finalCfg, nil
-}
-
-// LoadEnvOnly loads configuration from environment variables only
-func (l *UnifiedConfigLoader) LoadEnvOnly() *config.CollectorConfig {
- cfg := l.envLoader.LoadFromEnv()
- l.logger.Info("Configuration loaded from environment variables only")
- return cfg
-}
-
-// LoadFileOnly loads configuration from file only (without env overrides)
-func (l *UnifiedConfigLoader) LoadFileOnly() (*config.CollectorConfig, error) {
- if l.fileLoader.cfgPath == "" {
- cfg := l.factory.CreateDefaultConfig()
- l.logger.Info("No file specified, using default configuration")
- return cfg, nil
- }
-
- return l.fileLoader.LoadConfig()
-}
-
-// ValidateConfig validates the configuration using the factory
-func (l *UnifiedConfigLoader) ValidateConfig(cfg *config.CollectorConfig)
error {
- return l.factory.ValidateConfig(cfg)
-}
-
-// PrintConfig prints the configuration using the factory
-func (l *UnifiedConfigLoader) PrintConfig(cfg *config.CollectorConfig) {
- l.factory.PrintConfig(cfg)
-}
-
-// GetManagerAddress returns the manager address using the factory
-func (l *UnifiedConfigLoader) GetManagerAddress(cfg *config.CollectorConfig)
string {
- return l.factory.GetManagerAddress(cfg)
-}
diff --git a/internal/constants/const.go b/internal/constants/const.go
index d6ac8de..cd68a14 100644
--- a/internal/constants/const.go
+++ b/internal/constants/const.go
@@ -19,72 +19,76 @@
// ported from org.apache.hertzbeat.common.constants.CommonConstants
package constants
+const (
+ DefaultHertzBeatCollectorName = "hertzbeat-collector"
+)
+
// Collect Response status code (Go Collector specific)
const (
- CollectSuccess = 0 // Generic success
- CollectUnavailable = 1 // Service unavailable (e.g., target service
error)
- CollectUnReachable = 2 // Network unreachable (e.g., network timeout,
DNS fail)
- CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH
auth fail)
- CollectFail = 4 // Generic failure (e.g., query error, script
exec fail)
- CollectTimeout = 5 // Collection timeout
+ CollectSuccess = 0 // Generic success
+ CollectUnavailable = 1 // Service unavailable (e.g., target service error)
+ CollectUnReachable = 2 // Network unreachable (e.g., network timeout, DNS
fail)
+ CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH auth
fail)
+ CollectFail = 4 // Generic failure (e.g., query error, script exec
fail)
+ CollectTimeout = 5 // Collection timeout
)
// Field parameter types (Aligned with CommonConstants.java)
const (
- TYPE_NUMBER = 0 // Field parameter type: number
- TYPE_STRING = 1 // Field parameter type: String
- TYPE_SECRET = 2 // Field parameter type: encrypted string
- TYPE_TIME = 3 // Field parameter type: time
+ TYPE_NUMBER = 0 // Field parameter type: number
+ TYPE_STRING = 1 // Field parameter type: String
+ TYPE_SECRET = 2 // Field parameter type: encrypted string
+ TYPE_TIME = 3 // Field parameter type: time
)
// Monitoring status (Aligned with CommonConstants.java)
const (
- MONITOR_PAUSED_CODE = 0 // 0: Paused
- MONITOR_UP_CODE = 1 // 1: Up
- MONITOR_DOWN_CODE = 2 // 2: Down
+ MONITOR_PAUSED_CODE = 0 // 0: Paused
+ MONITOR_UP_CODE = 1 // 1: Up
+ MONITOR_DOWN_CODE = 2 // 2: Down
)
// Common metric/label keys
const (
- // Collection metric value: null placeholder for empty value
- NULL_VALUE = " "
+ // Collection metric value: null placeholder for empty value
+ NULL_VALUE = " "
- // Common metric keys
- ErrorMsg = "errorMsg"
- RESPONSE_TIME = "responseTime"
- StatusCode = "statusCode"
+ // Common metric keys
+ ErrorMsg = "errorMsg"
+ RESPONSE_TIME = "responseTime"
+ StatusCode = "statusCode"
- // Label keys (Aligned with CommonConstants.java)
- LABEL_INSTANCE = "instance"
- LABEL_DEFINE_ID = "defineid"
- LABEL_ALERT_NAME = "alertname"
- LABEL_INSTANCE_HOST = "instancehost"
- LABEL_INSTANCE_NAME = "instancename"
- LABEL_ALERT_SEVERITY = "severity"
+ // Label keys (Aligned with CommonConstants.java)
+ LABEL_INSTANCE = "instance"
+ LABEL_DEFINE_ID = "defineid"
+ LABEL_ALERT_NAME = "alertname"
+ LABEL_INSTANCE_HOST = "instancehost"
+ LABEL_INSTANCE_NAME = "instancename"
+ LABEL_ALERT_SEVERITY = "severity"
)
// Service specific constants
const (
- MongoDbAtlasModel = "mongodb-atlas"
+ MongoDbAtlasModel = "mongodb-atlas"
- // PostgreSQLUnReachAbleCode Specific SQLState for connection failure
- PostgreSQLUnReachAbleCode = "08001"
- // ZookeeperApp App name for Zookeeper
- ZookeeperApp = "zookeeper"
- // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output
- ZookeeperEnviHeader = "Environment:"
+ // PostgreSQLUnReachAbleCode Specific SQLState for connection failure
+ PostgreSQLUnReachAbleCode = "08001"
+ // ZookeeperApp App name for Zookeeper
+ ZookeeperApp = "zookeeper"
+ // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output
+ ZookeeperEnviHeader = "Environment:"
)
// Function related constants
const (
- CollectorModule = "collector"
+ CollectorModule = "collector"
)
// Legacy or alias constants
// These are kept for compatibility with previous Go code logic
const (
- // FieldTypeString Alias for TYPE_STRING
- FieldTypeString = TYPE_STRING
- // KeyWord Deprecated placeholder
- KeyWord = "keyword"
+ // FieldTypeString Alias for TYPE_STRING
+ FieldTypeString = TYPE_STRING
+ // KeyWord Deprecated placeholder
+ KeyWord = "keyword"
)
diff --git a/internal/transport/netty_client.go
b/internal/transport/netty_client.go
index ebaeade..ccdd0d0 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -434,27 +434,28 @@ func (c *NettyClient) connectionMonitor() {
if !connExists {
// Connection is nil, trigger disconnect event
c.triggerEvent(EventDisconnected, nil)
- log.Println("Connection lost, attempting to
reconnect...")
+ log.Println("Connection lost, starting reconnection
loop...")
_ = c.Shutdown()
- // Attempt to reconnect with exponential backoff
- for i := 0; i < 5; i++ {
+ // Attempt to reconnect continuously with fixed
3-second interval
+ attempt := 0
+ for c.IsStarted() {
+ attempt++
+ log.Printf("Reconnection attempt #%d, will
retry in 3 seconds...", attempt)
+ time.Sleep(3 * time.Second)
+
if !c.IsStarted() {
- break // Exit if shutdown was called
+ log.Println("Client shutdown requested,
stopping reconnection attempts")
+ break
}
- backoff := time.Duration(i+1) * 2 * time.Second
- log.Printf("Attempting to reconnect in %v...",
backoff)
- time.Sleep(backoff)
-
+ log.Printf("Reconnection attempt #%d: trying to
connect to %s...", attempt, c.addr)
if err := c.Start(); err == nil {
- log.Println("Reconnected successfully")
+ log.Printf("Reconnection attempt #%d:
SUCCESS - Connected to manager", attempt)
break
} else {
- log.Printf("Failed to reconnect: %v",
err)
- if i == 4 { // Last attempt
-
c.triggerEvent(EventConnectFailed, err)
- }
+ log.Printf("Reconnection attempt #%d:
FAILED - %v", attempt, err)
+ c.triggerEvent(EventConnectFailed, err)
}
}
}
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index d969f48..f989861 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -26,7 +26,7 @@ import (
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
@@ -87,7 +87,7 @@ func NewGoOnlineProcessor(client *GrpcClient)
*GoOnlineProcessor {
}
func (p *GoOnlineProcessor) Process(msg *pb.Message) (*pb.Message, error) {
- log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelInfo).WithName("go-online-processor")
+ log := logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo).WithName("go-online-processor")
// Handle go online message - parse ServerInfo and extract AES secret
log.Info("received GO_ONLINE message from manager")
@@ -188,7 +188,7 @@ func NewCollectCyclicDataProcessor(client *GrpcClient,
scheduler JobScheduler) *
}
func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message,
error) {
- log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelInfo).WithName("cyclic-task-processor")
+ log := logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo).WithName("cyclic-task-processor")
log.Info("processing cyclic task message", "identity", msg.Identity)
if p.scheduler == nil {
@@ -229,7 +229,7 @@ func (p *CollectCyclicDataProcessor) Process(msg
*pb.Message) (*pb.Message, erro
"monitorID", job.MonitorID,
"app", job.App)
- // Add job to scheduler
+ // Add job to scheduler - convert to interface{} for compatibility
if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
log.Error(err, "Failed to add cyclic job to scheduler: %v")
return &pb.Message{
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
index 807630a..8d0c155 100644
--- a/internal/transport/transport.go
+++ b/internal/transport/transport.go
@@ -22,9 +22,9 @@ import (
"encoding/json"
"fmt"
"os"
+ "time"
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
- config2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
@@ -72,24 +72,6 @@ func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner
{
return New(cfg)
}
-// NewFromEnv creates a new transport runner from environment variables
-func NewFromEnv() *Runner {
- envLoader := config2.NewEnvConfigLoader()
- cfg := envLoader.LoadFromEnv()
- return NewFromConfig(cfg)
-}
-
-// NewFromUnifiedConfig creates a new transport runner using unified
configuration loading
-// It loads from file first, then overrides with environment variables
-func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
- unifiedLoader := config2.NewUnifiedConfigLoader(cfgPath)
- cfg, err := unifiedLoader.Load()
- if err != nil {
- return nil, err
- }
- return NewFromConfig(cfg), nil
-}
-
func (r *Runner) Start(ctx context.Context) error {
// 初始化 Logger 如果它还没有被设置
if r.Logger.IsZero() {
@@ -125,71 +107,99 @@ func (r *Runner) Start(ctx context.Context) error {
factory := &TransportClientFactory{}
client, err := factory.CreateClient(protocol, addr)
if err != nil {
- r.Logger.Error(err, "Failed to create transport client")
- return err
- }
-
- // Set the identity on the client if it supports it
- identity := r.Config.Collector.Identity
- if identity == "" {
- identity = DefaultIdentity
- }
+ r.Logger.Error(err, "Failed to create transport client, will
retry in background")
+ // 不直接返回错误,而是继续启动,后续会在后台重试连接
+ } else {
+ // Set the identity on the client if it supports it
+ identity := r.Config.Collector.Identity
+ if identity == "" {
+ identity = DefaultIdentity
+ }
- if nettyClient, ok := client.(*NettyClient); ok {
- nettyClient.SetIdentity(identity)
- }
+ if nettyClient, ok := client.(*NettyClient); ok {
+ nettyClient.SetIdentity(identity)
+ }
- r.client = client
-
- // 设置事件处理器
- switch c := client.(type) {
- case *GrpcClient:
- c.SetEventHandler(func(event Event) {
- switch event.Type {
- case EventConnected:
- r.Logger.Info("Connected to manager gRPC
server", "addr", event.Address)
- go r.sendOnlineMessage()
- case EventDisconnected:
- r.Logger.Info("Disconnected from manager gRPC
server", "addr", event.Address)
- case EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to connect
to manager gRPC server", "addr", event.Address)
+ r.client = client
+
+ // 设置事件处理器
+ switch c := client.(type) {
+ case *GrpcClient:
+ c.SetEventHandler(func(event Event) {
+ switch event.Type {
+ case EventConnected:
+ r.Logger.Info("Connected to manager
gRPC server", "addr", event.Address)
+ go r.sendOnlineMessage()
+ case EventDisconnected:
+ r.Logger.Info("Disconnected from
manager gRPC server", "addr", event.Address)
+ case EventConnectFailed:
+ r.Logger.Error(event.Error, "Failed to
connect to manager gRPC server, will retry", "addr", event.Address)
+ }
+ })
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ RegisterDefaultProcessors(c, r.jobScheduler)
+ r.Logger.Info("Registered gRPC processors with
job scheduler")
+ } else {
+ RegisterDefaultProcessors(c, nil)
+ r.Logger.Info("Registered gRPC processors
without job scheduler")
}
- })
- // Register processors with job scheduler
- if r.jobScheduler != nil {
- RegisterDefaultProcessors(c, r.jobScheduler)
- r.Logger.Info("Registered gRPC processors with job
scheduler")
- } else {
- RegisterDefaultProcessors(c, nil)
- r.Logger.Info("Registered gRPC processors without job
scheduler")
- }
- case *NettyClient:
- c.SetEventHandler(func(event Event) {
- switch event.Type {
- case EventConnected:
- r.Logger.Info("Connected to manager netty
server", "addr", event.Address)
- go r.sendOnlineMessage()
- case EventDisconnected:
- r.Logger.Info("Disconnected from manager netty
server", "addr", event.Address)
- case EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to connect
to manager netty server", "addr", event.Address)
+ case *NettyClient:
+ c.SetEventHandler(func(event Event) {
+ switch event.Type {
+ case EventConnected:
+ r.Logger.Info("Connected to manager
netty server", "addr", event.Address)
+ go r.sendOnlineMessage()
+ case EventDisconnected:
+ r.Logger.Info("Disconnected from
manager netty server", "addr", event.Address)
+ case EventConnectFailed:
+ r.Logger.Error(event.Error, "Failed to
connect to manager netty server, will retry", "addr", event.Address)
+ }
+ })
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ RegisterDefaultNettyProcessors(c,
r.jobScheduler)
+ r.Logger.Info("Registered netty processors with
job scheduler")
+ } else {
+ RegisterDefaultNettyProcessors(c, nil)
+ r.Logger.Info("Registered netty processors
without job scheduler")
}
- })
- // Register processors with job scheduler
- if r.jobScheduler != nil {
- RegisterDefaultNettyProcessors(c, r.jobScheduler)
- r.Logger.Info("Registered netty processors with job
scheduler")
- } else {
- RegisterDefaultNettyProcessors(c, nil)
- r.Logger.Info("Registered netty processors without job
scheduler")
}
- }
- if err := r.client.Start(); err != nil {
- r.Logger.Error(err, "Failed to start transport client")
- return err
+ // 尝试启动客户端,如果失败则启动重连循环
+ if err := r.client.Start(); err != nil {
+ r.Logger.Error(err, "Failed to start transport client
on first attempt, starting retry loop")
+
+ // 在后台启动重连循环
+ go func() {
+ attempt := 0
+ for {
+ attempt++
+ r.Logger.Info("Reconnection attempt",
"attempt", attempt, "wait", "3s")
+ time.Sleep(3 * time.Second)
+
+ // 检查是否应该停止
+ select {
+ case <-ctx.Done():
+ r.Logger.Info("Reconnection
loop stopped due to context cancellation")
+ return
+ default:
+ }
+
+ r.Logger.Info("Trying to connect to
manager", "attempt", attempt, "addr", addr)
+ if err := r.client.Start(); err == nil {
+ r.Logger.Info("Successfully
connected to manager", "attempt", attempt)
+ return
+ } else {
+ r.Logger.Error(err, "Connection
failed, will retry", "attempt", attempt)
+ }
+ }
+ }()
+ }
}
+ r.Logger.Info("Transport runner started successfully, connection will
be established in background")
+
// 创建新的context用于监控关闭信号
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -198,7 +208,9 @@ func (r *Runner) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
r.Logger.Info("Shutting down transport client...")
- _ = r.client.Shutdown()
+ if r.client != nil {
+ _ = r.client.Shutdown()
+ }
}()
// 阻塞直到 ctx.Done
diff --git a/internal/types/config/config_types.go
b/internal/types/config/config_types.go
index ee8d8c5..4398cce 100644
--- a/internal/types/config/config_types.go
+++ b/internal/types/config/config_types.go
@@ -25,23 +25,24 @@ type CollectorSection struct {
Info CollectorInfo `yaml:"info"`
Log CollectorLogConfig `yaml:"log"`
Manager ManagerConfig `yaml:"manager"`
- Identity string `yaml:"identity" env:"IDENTITY"`
- Mode string `yaml:"mode" env:"MODE"`
- // Add Dispatcher if needed
+ Identity string `yaml:"identity"`
+ Mode string `yaml:"mode"`
+ // todo dispatcher
}
type CollectorInfo struct {
- Name string `yaml:"name"`
IP string `yaml:"ip"`
+ Name string `yaml:"name"`
Port string `yaml:"port"`
}
type CollectorLogConfig struct {
+ // Level is one of: debug, info, warn, error, panic, panic, fatal
Level string `yaml:"level"`
}
type ManagerConfig struct {
- Host string `yaml:"host" env:"MANAGER_HOST"`
- Port string `yaml:"port" env:"MANAGER_PORT"`
- Protocol string `yaml:"protocol" env:"MANAGER_PROTOCOL"`
+ Host string `yaml:"host"`
+ Port string `yaml:"port"`
+ Protocol string `yaml:"protocol"`
}
diff --git a/internal/util/logger/logger.go b/internal/util/logger/logger.go
index cc657eb..8ef1456 100644
--- a/internal/util/logger/logger.go
+++ b/internal/util/logger/logger.go
@@ -25,6 +25,7 @@ import (
"github.com/go-logr/zapr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]