This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 652f928  feat: optimize cfg and binary start (#36)
652f928 is described below

commit 652f928e339a6c39099f55bdd9cd2036c94a7c22
Author: shown <[email protected]>
AuthorDate: Wed Dec 17 17:47:19 2025 +0800

    feat: optimize cfg and binary start (#36)
---
 etc/hertzbeat-collector.yaml          |   2 +-
 go.mod                                |   1 +
 go.sum                                |  11 +-
 internal/config/config.go             | 278 +++++++++++++++++++++-----
 internal/config/config_factory.go     | 353 ----------------------------------
 internal/config/env_config.go         |  84 --------
 internal/config/unified_config.go     | 114 -----------
 internal/constants/const.go           |  80 ++++----
 internal/transport/netty_client.go    |  27 +--
 internal/transport/processors.go      |   8 +-
 internal/transport/transport.go       | 166 ++++++++--------
 internal/types/config/config_types.go |  15 +-
 internal/util/logger/logger.go        |   1 +
 13 files changed, 395 insertions(+), 745 deletions(-)

diff --git a/etc/hertzbeat-collector.yaml b/etc/hertzbeat-collector.yaml
index e67a1cd..5e87865 100644
--- a/etc/hertzbeat-collector.yaml
+++ b/etc/hertzbeat-collector.yaml
@@ -22,7 +22,7 @@ collector:
     port: 8080
 
   log:
-    level: debug
+    level: info
 
   # Transport/Manager configuration
   manager:
diff --git a/go.mod b/go.mod
index df2342e..d840a0e 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@ require (
        github.com/alicebob/miniredis/v2 v2.35.0
        github.com/apache/arrow/go/v13 v13.0.0
        github.com/expr-lang/expr v1.16.9
+       github.com/fsnotify/fsnotify v1.4.7
        github.com/go-logr/logr v1.4.3
        github.com/go-logr/zapr v1.3.0
        github.com/go-sql-driver/mysql v1.9.3
diff --git a/go.sum b/go.sum
index 5ba8df6..d52801f 100644
--- a/go.sum
+++ b/go.sum
@@ -28,11 +28,11 @@ github.com/armon/consul-api 
v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
 github.com/aymerick/raymond 
v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod 
h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
-github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod 
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/bsm/ginkgo/v2 v2.12.0 
h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
 github.com/bsm/ginkgo/v2 v2.12.0/go.mod 
h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
 github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
 github.com/bsm/gomega v1.27.10/go.mod 
h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod 
h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/cespare/xxhash/v2 v2.3.0 
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
 github.com/cespare/xxhash/v2 v2.3.0/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/client9/misspell v0.3.4/go.mod 
h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@@ -58,6 +58,8 @@ github.com/davecgh/go-spew 
v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dgraph-io/badger v1.6.0/go.mod 
h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
 github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod 
h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f 
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod 
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/dustin/go-humanize v1.0.0/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod 
h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod 
h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -66,12 +68,11 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod 
h1:6rpuAdCZL397s3pYoYcLgu1m
 github.com/envoyproxy/go-control-plane 
v0.9.9-0.20210217033140-668b12f5399d/go.mod 
h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
 github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod 
h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
 github.com/etcd-io/bbolt v1.3.3/go.mod 
h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
-github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f 
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
-github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod 
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/expr-lang/expr v1.16.9 
h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=
 github.com/expr-lang/expr v1.16.9/go.mod 
h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
 github.com/fasthttp-contrib/websocket 
v0.0.0-20160511215533-1f3b11f56072/go.mod 
h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
 github.com/fatih/structs v1.1.0/go.mod 
h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
+github.com/fsnotify/fsnotify v1.4.7 
h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/gavv/httpexpect v2.0.0+incompatible/go.mod 
h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc=
 github.com/getsentry/sentry-go v0.12.0 
h1:era7g0re5iY13bHSdN/xMkyV+5zZppjRVQhZrXCaEIk=
@@ -256,11 +257,11 @@ github.com/prometheus/common v0.65.0 
h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2
 github.com/prometheus/common v0.65.0/go.mod 
h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
 github.com/prometheus/procfs v0.16.1 
h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
 github.com/prometheus/procfs v0.16.1/go.mod 
h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
+github.com/redis/go-redis/v9 v9.17.1 
h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
+github.com/redis/go-redis/v9 v9.17.1/go.mod 
h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
 github.com/rogpeppe/go-internal v1.6.1/go.mod 
h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
 github.com/rogpeppe/go-internal v1.8.1/go.mod 
h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o=
 github.com/rogpeppe/go-internal v1.9.0/go.mod 
h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
-github.com/redis/go-redis/v9 v9.17.1 
h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
-github.com/redis/go-redis/v9 v9.17.1/go.mod 
h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
 github.com/rogpeppe/go-internal v1.13.1 
h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
 github.com/rogpeppe/go-internal v1.13.1/go.mod 
h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
 github.com/russross/blackfriday v1.5.2/go.mod 
h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
diff --git a/internal/config/config.go b/internal/config/config.go
index a5a299f..0770e44 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -18,110 +18,290 @@
 package config
 
 import (
+       "context"
        "errors"
+       "fmt"
        "os"
+       "path/filepath"
+       "sync"
+       "sync/atomic"
+       "time"
 
+       "github.com/fsnotify/fsnotify"
        "gopkg.in/yaml.v3"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       cfgtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
        collectortypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
        loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
-
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
-const (
-       DefaultHertzBeatCollectorName = "hertzbeat-collector"
+var (
+       globalConfig atomic.Pointer[cfgtypes.CollectorConfig]
+       configMu     sync.RWMutex
 )
 
-// Loader handles file-based configuration loading
-// It uses the common ConfigFactory for consistency
+// Loader handles file-based configuration loading with hot-reload support
 type Loader struct {
        cfgPath string
-       factory *ConfigFactory
        logger  logger.Logger
 }
 
+// New creates a new configuration loader
 func New(cfgPath string) *Loader {
+
        return &Loader{
                cfgPath: cfgPath,
-               factory: NewConfigFactory(),
+               logger:  logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo).WithName("config-loader"),
        }
 }
 
-func (l *Loader) LoadConfig() (*config.CollectorConfig, error) {
-       l.logger = logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo).WithName("config-loader")
-
+// LoadConfig loads configuration from file
+func (l *Loader) LoadConfig() (*cfgtypes.CollectorConfig, error) {
        if l.cfgPath == "" {
-               l.logger.Info("file-config-loader: path is empty, using 
defaults")
-               return l.factory.CreateDefaultConfig(), nil
+               err := errors.New("config path is required")
+               l.logger.Error(err, "config path is empty")
+               return nil, err
        }
 
-       if _, err := os.Stat(l.cfgPath); os.IsNotExist(err) {
-               l.logger.Error(err, "file-config-loader: file not exist", 
"path", l.cfgPath)
+       cfg, err := l.parseConfigFile(l.cfgPath)
+       if err != nil {
                return nil, err
        }
 
-       file, err := os.Open(l.cfgPath)
+       l.logger.Info("configuration loaded successfully", "path", l.cfgPath)
+       return cfg, nil
+}
+
+// parseConfigFile parses the YAML config file
+func (l *Loader) parseConfigFile(path string) (*cfgtypes.CollectorConfig, 
error) {
+       // Resolve symlinks to handle Kubernetes ConfigMap mounts
+       resolved, err := filepath.EvalSymlinks(path)
        if err != nil {
+               resolved = path
+       }
+
+       if _, err := os.Stat(resolved); os.IsNotExist(err) {
+               l.logger.Error(err, "config file not exist", "path", resolved)
+               return nil, err
+       }
+
+       data, err := os.ReadFile(resolved)
+       if err != nil {
+               l.logger.Error(err, "failed to read config file", "path", 
resolved)
                return nil, err
        }
-       defer func(file *os.File) {
-               err := file.Close()
-               if err != nil {
-                       l.logger.Error(err, "close config file failed")
-               }
-       }(file)
 
-       var cfg config.CollectorConfig
-       decoder := yaml.NewDecoder(file)
-       if err := decoder.Decode(&cfg); err != nil {
-               l.logger.Error(err, "decode config file failed")
+       var cfg cfgtypes.CollectorConfig
+       if err := yaml.Unmarshal(data, &cfg); err != nil {
+               l.logger.Error(err, "failed to parse config file", "path", 
resolved)
                return nil, err
        }
 
-       // Fill in any missing fields with defaults
-       filledCfg := l.factory.MergeWithEnv(&cfg)
+       // Apply default values
+       // when some fields are missing in config file
+       l.applyDefaults(&cfg)
 
-       l.logger.Info("Configuration loaded from file", "path", l.cfgPath)
-       return filledCfg, nil
+       return &cfg, nil
 }
 
-func (l *Loader) ValidateConfig(cfg *config.CollectorConfig) error {
-       if l.factory != nil {
-               return l.factory.ValidateConfig(cfg)
+// applyDefaults fills in missing configuration with defaults
+func (l *Loader) applyDefaults(cfg *cfgtypes.CollectorConfig) {
+       if cfg.Collector.Info.Name == "" {
+               cfg.Collector.Info.Name = 
constants.DefaultHertzBeatCollectorName
        }
+       if cfg.Collector.Log.Level == "" {
+               cfg.Collector.Log.Level = string(loggertypes.LogLevelInfo)
+       }
+       if cfg.Collector.Manager.Protocol == "" {
+               cfg.Collector.Manager.Protocol = "http"
+       }
+}
 
-       // Fallback validation if factory is not available
+// ValidateConfig validates the configuration
+func (l *Loader) ValidateConfig(cfg *cfgtypes.CollectorConfig) error {
        if cfg == nil {
-               l.logger.Error(collectortypes.CollectorConfigIsNil, 
"config-loader is nil")
-               return errors.New("config-loader is nil")
+               err := errors.New("config is nil")
+               l.logger.Error(collectortypes.CollectorConfigIsNil, "config 
validation failed")
+               return err
        }
 
-       // other check
        if cfg.Collector.Info.IP == "" {
-               l.logger.Error(collectortypes.CollectorIPIsNil, "collector ip 
is empty")
-               return errors.New("config-loader ip is empty")
+               err := errors.New("collector ip is empty")
+               l.logger.Error(collectortypes.CollectorIPIsNil, "config 
validation failed")
+               return err
        }
 
        if cfg.Collector.Info.Port == "" {
-               l.logger.Error(collectortypes.CollectorPortIsNil, 
"config-loader: port is empty")
-               return errors.New("config-loader port is empty")
+               err := errors.New("collector port is empty")
+               l.logger.Error(collectortypes.CollectorPortIsNil, "config 
validation failed")
+               return err
        }
 
        if cfg.Collector.Info.Name == "" {
-               l.logger.Sugar().Debug("config-loader: name is empty")
-               cfg.Collector.Info.Name = DefaultHertzBeatCollectorName
+               cfg.Collector.Info.Name = 
constants.DefaultHertzBeatCollectorName
+               l.logger.Sugar().Debug("collector name is empty, using default")
        }
 
        return nil
 }
 
-// GetManagerAddress returns the full manager address using the factory
-func (l *Loader) GetManagerAddress(cfg *config.CollectorConfig) string {
-       return l.factory.GetManagerAddress(cfg)
+// GetManagerAddress returns the full manager address
+func (l *Loader) GetManagerAddress(cfg *cfgtypes.CollectorConfig) string {
+       if cfg == nil || cfg.Collector.Manager.Host == "" {
+               return ""
+       }
+       protocol := cfg.Collector.Manager.Protocol
+       if protocol == "" {
+               protocol = "http"
+       }
+       return fmt.Sprintf("%s://%s:%s", protocol, cfg.Collector.Manager.Host, 
cfg.Collector.Manager.Port)
+}
+
+// PrintConfig prints the configuration
+func (l *Loader) PrintConfig(cfg *cfgtypes.CollectorConfig) {
+       if cfg == nil {
+               l.logger.Info("config is nil")
+               return
+       }
+       l.logger.Info("current configuration",
+               "name", cfg.Collector.Info.Name,
+               "ip", cfg.Collector.Info.IP,
+               "port", cfg.Collector.Info.Port,
+               "log_level", cfg.Collector.Log.Level,
+               "manager", l.GetManagerAddress(cfg),
+       )
+}
+
+// GetGlobalConfig returns the current global configuration
+func GetGlobalConfig() *cfgtypes.CollectorConfig {
+       return globalConfig.Load()
+}
+
+// SetGlobalConfig sets the global configuration
+func SetGlobalConfig(cfg *cfgtypes.CollectorConfig) {
+       configMu.Lock()
+       defer configMu.Unlock()
+       globalConfig.Store(cfg)
+}
+
+// WatchConfigAndReload watches the config file and reloads on changes
+// This function implements hot-reload for both configuration and logging
+func (l *Loader) WatchConfigAndReload(ctx context.Context) error {
+       watcher, err := fsnotify.NewWatcher()
+       if err != nil {
+               l.logger.Error(err, "failed to create config watcher")
+               return err
+       }
+       defer watcher.Close()
+
+       // Watch both the file and its directory to handle symlink swaps 
(Kubernetes ConfigMap)
+       cfgFile := l.cfgPath
+       cfgDir := filepath.Dir(cfgFile)
+
+       if err := watcher.Add(cfgDir); err != nil {
+               l.logger.Error(err, "failed to watch config directory", "dir", 
cfgDir)
+               return err
+       }
+
+       // Try to watch the file directly (best-effort)
+       _ = watcher.Add(cfgFile)
+
+       l.logger.Info("config file watcher started", "path", cfgFile)
+
+       // Debounce events
+       var (
+               pending bool
+               last    time.Time
+       )
+
+       reload := func() {
+               l.logger.Info("config file changed, reloading...")
+
+               // Parse new configuration
+               newCfg, err := l.parseConfigFile(cfgFile)
+               if err != nil {
+                       l.logger.Error(err, "failed to reload config")
+                       return
+               }
+
+               // Validate new configuration
+               if err := l.ValidateConfig(newCfg); err != nil {
+                       l.logger.Error(err, "invalid config after reload")
+                       return
+               }
+
+               // Update global configuration
+               SetGlobalConfig(newCfg)
+
+               // Hot-reload logging configuration
+               if err := l.reloadLogging(newCfg); err != nil {
+                       l.logger.Error(err, "failed to reload logging")
+               }
+
+               l.logger.Info("configuration reloaded successfully")
+       }
+
+       for {
+               select {
+               case <-ctx.Done():
+                       l.logger.Info("config watcher stopped")
+                       return ctx.Err()
+
+               case event, ok := <-watcher.Events:
+                       if !ok {
+                               return nil
+                       }
+
+                       // Handle Write, Create, Remove, Rename, Chmod events
+                       if 
event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename|fsnotify.Chmod)
 != 0 {
+                               // Check if the event pertains to the config 
file or directory
+                               if filepath.Base(event.Name) == 
filepath.Base(cfgFile) || filepath.Dir(event.Name) == cfgDir {
+                                       // Debounce: if not pending or enough 
time has passed
+                                       if !pending || time.Since(last) > 
250*time.Millisecond {
+                                               pending = true
+                                               last = time.Now()
+                                               // Slight delay to let file 
settle
+                                               go func() {
+                                                       time.Sleep(300 * 
time.Millisecond)
+                                                       reload()
+                                                       pending = false
+                                               }()
+                                       }
+                               }
+                       }
+
+               case err, ok := <-watcher.Errors:
+                       if !ok {
+                               return nil
+                       }
+                       l.logger.Error(err, "config watcher error")
+               }
+       }
 }
 
-// PrintConfig prints the configuration using the factory
-func (l *Loader) PrintConfig(cfg *config.CollectorConfig) {
-       l.factory.PrintConfig(cfg)
+// reloadLogging reloads the logging configuration
+// when config file changes, it helps dynamically
+// adjust the log level for dynamic debugging
+func (l *Loader) reloadLogging(cfg *cfgtypes.CollectorConfig) error {
+
+       if cfg == nil {
+               return errors.New("config is nil")
+       }
+
+       // Parse log level from config
+       level := loggertypes.LogLevel(cfg.Collector.Log.Level)
+       if level == "" {
+               level = loggertypes.LogLevelInfo
+       }
+
+       // Create new logger with updated level
+       newLogger := logger.DefaultLogger(os.Stdout, 
level).WithName("config-loader")
+
+       // Update loader's logger
+       l.logger = newLogger
+
+       l.logger.Info("logging configuration reloaded", "level", level)
+       return nil
 }
diff --git a/internal/config/config_factory.go 
b/internal/config/config_factory.go
deleted file mode 100644
index 7854d4f..0000000
--- a/internal/config/config_factory.go
+++ /dev/null
@@ -1,353 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package config
-
-import (
-       "fmt"
-       "os"
-       "strconv"
-       "strings"
-
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-// ConfigDefaults contains all default configuration values
-type ConfigDefaults struct {
-       Identity        string
-       Mode            string
-       CollectorName   string
-       CollectorIP     string
-       CollectorPort   string
-       ManagerHost     string
-       ManagerPort     string
-       ManagerProtocol string
-       LogLevel        string
-}
-
-// GetDefaultConfig returns the default configuration values
-func GetDefaultConfig() *ConfigDefaults {
-       return &ConfigDefaults{
-               Identity:        "hertzbeat-collector-go",
-               Mode:            "public",
-               CollectorName:   "hertzbeat-collector-go",
-               CollectorIP:     "127.0.0.1",
-               CollectorPort:   "8080",
-               ManagerHost:     "127.0.0.1",
-               ManagerPort:     "1158",
-               ManagerProtocol: "netty",
-               LogLevel:        "info",
-       }
-}
-
-// ConfigFactory provides factory methods for creating configurations
-type ConfigFactory struct {
-       defaults *ConfigDefaults
-       logger   logger.Logger
-}
-
-// NewConfigFactory creates a new configuration factory
-func NewConfigFactory() *ConfigFactory {
-       return &ConfigFactory{
-               defaults: GetDefaultConfig(),
-               logger:   logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo).WithName("config-factory"),
-       }
-}
-
-// CreateDefaultConfig creates a configuration with default values
-func (f *ConfigFactory) CreateDefaultConfig() *config.CollectorConfig {
-       return &config.CollectorConfig{
-               Collector: config.CollectorSection{
-                       Info: config.CollectorInfo{
-                               Name: f.defaults.CollectorName,
-                               IP:   f.defaults.CollectorIP,
-                               Port: f.defaults.CollectorPort,
-                       },
-                       Log: config.CollectorLogConfig{
-                               Level: f.defaults.LogLevel,
-                       },
-                       Manager: config.ManagerConfig{
-                               Host:     f.defaults.ManagerHost,
-                               Port:     f.defaults.ManagerPort,
-                               Protocol: f.defaults.ManagerProtocol,
-                       },
-                       Identity: f.defaults.Identity,
-                       Mode:     f.defaults.Mode,
-               },
-       }
-}
-
-// CreateFromEnv creates configuration from environment variables with 
fallback to defaults
-func (f *ConfigFactory) CreateFromEnv() *config.CollectorConfig {
-       cfg := f.CreateDefaultConfig()
-
-       // Override with environment variables if they exist
-       if identity := os.Getenv("IDENTITY"); identity != "" {
-               cfg.Collector.Identity = identity
-       }
-
-       if mode := os.Getenv("MODE"); mode != "" {
-               cfg.Collector.Mode = mode
-       }
-
-       if name := os.Getenv("COLLECTOR_NAME"); name != "" {
-               cfg.Collector.Info.Name = name
-       }
-
-       if ip := os.Getenv("COLLECTOR_IP"); ip != "" {
-               cfg.Collector.Info.IP = ip
-       }
-
-       if port := os.Getenv("COLLECTOR_PORT"); port != "" {
-               cfg.Collector.Info.Port = port
-       }
-
-       if host := os.Getenv("MANAGER_HOST"); host != "" {
-               cfg.Collector.Manager.Host = host
-       }
-
-       if port := os.Getenv("MANAGER_PORT"); port != "" {
-               cfg.Collector.Manager.Port = port
-       }
-
-       if protocol := os.Getenv("MANAGER_PROTOCOL"); protocol != "" {
-               cfg.Collector.Manager.Protocol = protocol
-       }
-
-       if level := os.Getenv("LOG_LEVEL"); level != "" {
-               cfg.Collector.Log.Level = level
-       }
-
-       f.logger.Info("Configuration created from environment variables",
-               "identity", cfg.Collector.Identity,
-               "mode", cfg.Collector.Mode,
-               "manager", fmt.Sprintf("%s:%s", cfg.Collector.Manager.Host, 
cfg.Collector.Manager.Port))
-
-       return cfg
-}
-
-// MergeWithEnv merges file configuration with environment variable overrides
-func (f *ConfigFactory) MergeWithEnv(fileCfg *config.CollectorConfig) 
*config.CollectorConfig {
-       if fileCfg == nil {
-               return f.CreateFromEnv()
-       }
-
-       // Start with file configuration
-       cfg := *fileCfg
-
-       // Override with environment variables if they exist
-       if identity := os.Getenv("IDENTITY"); identity != "" {
-               cfg.Collector.Identity = identity
-       }
-
-       if mode := os.Getenv("MODE"); mode != "" {
-               cfg.Collector.Mode = mode
-       }
-
-       if name := os.Getenv("COLLECTOR_NAME"); name != "" {
-               cfg.Collector.Info.Name = name
-       }
-
-       if ip := os.Getenv("COLLECTOR_IP"); ip != "" {
-               cfg.Collector.Info.IP = ip
-       }
-
-       if port := os.Getenv("COLLECTOR_PORT"); port != "" {
-               cfg.Collector.Info.Port = port
-       }
-
-       if host := os.Getenv("MANAGER_HOST"); host != "" {
-               cfg.Collector.Manager.Host = host
-       }
-
-       if port := os.Getenv("MANAGER_PORT"); port != "" {
-               cfg.Collector.Manager.Port = port
-       }
-
-       if protocol := os.Getenv("MANAGER_PROTOCOL"); protocol != "" {
-               cfg.Collector.Manager.Protocol = protocol
-       }
-
-       if level := os.Getenv("LOG_LEVEL"); level != "" {
-               cfg.Collector.Log.Level = level
-       }
-
-       // Fill in defaults for any empty fields
-       f.fillDefaults(&cfg)
-
-       f.logger.Info("Configuration merged with environment variables")
-       return &cfg
-}
-
-// fillDefaults fills in default values for any empty fields
-func (f *ConfigFactory) fillDefaults(cfg *config.CollectorConfig) {
-       if cfg.Collector.Identity == "" {
-               cfg.Collector.Identity = f.defaults.Identity
-       }
-
-       if cfg.Collector.Mode == "" {
-               cfg.Collector.Mode = f.defaults.Mode
-       }
-
-       if cfg.Collector.Info.Name == "" {
-               cfg.Collector.Info.Name = f.defaults.CollectorName
-       }
-
-       if cfg.Collector.Info.IP == "" {
-               cfg.Collector.Info.IP = f.defaults.CollectorIP
-       }
-
-       if cfg.Collector.Info.Port == "" {
-               cfg.Collector.Info.Port = f.defaults.CollectorPort
-       }
-
-       if cfg.Collector.Manager.Host == "" {
-               cfg.Collector.Manager.Host = f.defaults.ManagerHost
-       }
-
-       if cfg.Collector.Manager.Port == "" {
-               cfg.Collector.Manager.Port = f.defaults.ManagerPort
-       }
-
-       if cfg.Collector.Manager.Protocol == "" {
-               cfg.Collector.Manager.Protocol = f.defaults.ManagerProtocol
-       }
-
-       if cfg.Collector.Log.Level == "" {
-               cfg.Collector.Log.Level = f.defaults.LogLevel
-       }
-}
-
-// ValidateConfig validates the configuration
-func (f *ConfigFactory) ValidateConfig(cfg *config.CollectorConfig) error {
-       if cfg == nil {
-               return fmt.Errorf("configuration is nil")
-       }
-
-       // Validate required fields
-       if cfg.Collector.Identity == "" {
-               return fmt.Errorf("collector identity is required")
-       }
-
-       if cfg.Collector.Mode == "" {
-               return fmt.Errorf("collector mode is required")
-       }
-
-       if cfg.Collector.Manager.Host == "" {
-               return fmt.Errorf("manager host is required")
-       }
-
-       if cfg.Collector.Manager.Port == "" {
-               return fmt.Errorf("manager port is required")
-       }
-
-       // Validate mode
-       validModes := map[string]bool{
-               "public":  true,
-               "private": true,
-       }
-       if !validModes[strings.ToLower(cfg.Collector.Mode)] {
-               return fmt.Errorf("invalid mode: %s, must be 'public' or 
'private'", cfg.Collector.Mode)
-       }
-
-       // Validate protocol
-       validProtocols := map[string]bool{
-               "netty": true,
-               "grpc":  true,
-       }
-       if cfg.Collector.Manager.Protocol != "" && 
!validProtocols[strings.ToLower(cfg.Collector.Manager.Protocol)] {
-               return fmt.Errorf("invalid protocol: %s, must be 'netty' or 
'grpc'", cfg.Collector.Manager.Protocol)
-       }
-
-       // Validate port numbers
-       if _, err := strconv.Atoi(cfg.Collector.Info.Port); err != nil {
-               return fmt.Errorf("invalid collector port: %s", 
cfg.Collector.Info.Port)
-       }
-
-       if _, err := strconv.Atoi(cfg.Collector.Manager.Port); err != nil {
-               return fmt.Errorf("invalid manager port: %s", 
cfg.Collector.Manager.Port)
-       }
-
-       f.logger.Info("Configuration validation passed")
-       return nil
-}
-
-// GetManagerAddress returns the full manager address (host:port)
-func (f *ConfigFactory) GetManagerAddress(cfg *config.CollectorConfig) string {
-       if cfg == nil || cfg.Collector.Manager.Host == "" || 
cfg.Collector.Manager.Port == "" {
-               return fmt.Sprintf("%s:%s", f.defaults.ManagerHost, 
f.defaults.ManagerPort)
-       }
-       return fmt.Sprintf("%s:%s", cfg.Collector.Manager.Host, 
cfg.Collector.Manager.Port)
-}
-
-// IsPublicMode checks if the collector is running in public mode
-func (f *ConfigFactory) IsPublicMode(cfg *config.CollectorConfig) bool {
-       if cfg == nil {
-               return strings.ToLower(f.defaults.Mode) == "public"
-       }
-       return strings.ToLower(cfg.Collector.Mode) == "public"
-}
-
-// IsPrivateMode checks if the collector is running in private mode
-func (f *ConfigFactory) IsPrivateMode(cfg *config.CollectorConfig) bool {
-       if cfg == nil {
-               return strings.ToLower(f.defaults.Mode) == "private"
-       }
-       return strings.ToLower(cfg.Collector.Mode) == "private"
-}
-
-// PrintConfig prints the configuration in a readable format
-func (f *ConfigFactory) PrintConfig(cfg *config.CollectorConfig) {
-       if cfg == nil {
-               f.logger.Error(fmt.Errorf("configuration is nil"), 
"Configuration is nil")
-               return
-       }
-
-       f.logger.Info("=== Collector Configuration ===")
-       f.logger.Info("Identity", "value", cfg.Collector.Identity)
-       f.logger.Info("Mode", "value", cfg.Collector.Mode)
-       f.logger.Info("Collector Name", "value", cfg.Collector.Info.Name)
-       f.logger.Info("Collector Address", "value", fmt.Sprintf("%s:%s", 
cfg.Collector.Info.IP, cfg.Collector.Info.Port))
-       f.logger.Info("Manager Address", "value", fmt.Sprintf("%s:%s", 
cfg.Collector.Manager.Host, cfg.Collector.Manager.Port))
-       f.logger.Info("Manager Protocol", "value", 
cfg.Collector.Manager.Protocol)
-       f.logger.Info("Log Level", "value", cfg.Collector.Log.Level)
-       f.logger.Info("===============================")
-}
-
-// === Configuration Entry Points ===
-
-// LoadFromFile loads configuration from file only
-func LoadFromFile(cfgPath string) (*config.CollectorConfig, error) {
-       loader := New(cfgPath)
-       return loader.LoadConfig()
-}
-
-// LoadFromEnv loads configuration from environment variables only
-func LoadFromEnv() *config.CollectorConfig {
-       envLoader := NewEnvConfigLoader()
-       return envLoader.LoadFromEnv()
-}
-
-// LoadUnified loads configuration with file + env priority (recommended)
-func LoadUnified(cfgPath string) (*config.CollectorConfig, error) {
-       unifiedLoader := NewUnifiedConfigLoader(cfgPath)
-       return unifiedLoader.Load()
-}
diff --git a/internal/config/env_config.go b/internal/config/env_config.go
deleted file mode 100644
index 40c09c1..0000000
--- a/internal/config/env_config.go
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-import (
-       "os"
-
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-// EnvConfigLoader handles environment variable configuration
-// It uses the common ConfigFactory for consistency
-type EnvConfigLoader struct {
-       factory *ConfigFactory
-       logger  logger.Logger
-}
-
-// NewEnvConfigLoader creates a new environment variable configuration loader
-func NewEnvConfigLoader() *EnvConfigLoader {
-       return &EnvConfigLoader{
-               factory: NewConfigFactory(),
-               logger:  logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo).WithName("env-config-loader"),
-       }
-}
-
-// LoadFromEnv loads configuration from environment variables with defaults
-func (l *EnvConfigLoader) LoadFromEnv() *config.CollectorConfig {
-       cfg := l.factory.CreateFromEnv()
-       l.logger.Info("Configuration loaded from environment variables")
-       return cfg
-}
-
-// LoadWithDefaults loads configuration with environment variables overriding 
provided defaults
-func (l *EnvConfigLoader) LoadWithDefaults(defaultCfg *config.CollectorConfig) 
*config.CollectorConfig {
-       if defaultCfg == nil {
-               return l.LoadFromEnv()
-       }
-
-       cfg := l.factory.MergeWithEnv(defaultCfg)
-       l.logger.Info("Configuration loaded with environment variable 
overrides")
-       return cfg
-}
-
-// ValidateEnvConfig validates the environment configuration
-func (l *EnvConfigLoader) ValidateEnvConfig(cfg *config.CollectorConfig) error 
{
-       return l.factory.ValidateConfig(cfg)
-}
-
-// PrintEnvConfig prints the current environment configuration
-func (l *EnvConfigLoader) PrintEnvConfig(cfg *config.CollectorConfig) {
-       l.factory.PrintConfig(cfg)
-}
-
-// GetManagerAddress returns the full manager address (host:port)
-func (l *EnvConfigLoader) GetManagerAddress(cfg *config.CollectorConfig) 
string {
-       return l.factory.GetManagerAddress(cfg)
-}
-
-// IsPublicMode checks if the collector is running in public mode
-func (l *EnvConfigLoader) IsPublicMode(cfg *config.CollectorConfig) bool {
-       return l.factory.IsPublicMode(cfg)
-}
-
-// IsPrivateMode checks if the collector is running in private mode
-func (l *EnvConfigLoader) IsPrivateMode(cfg *config.CollectorConfig) bool {
-       return l.factory.IsPrivateMode(cfg)
-}
diff --git a/internal/config/unified_config.go 
b/internal/config/unified_config.go
deleted file mode 100644
index 2640448..0000000
--- a/internal/config/unified_config.go
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package config
-
-import (
-       "os"
-
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-// UnifiedConfigLoader provides a unified configuration loading interface
-// that loads from file first, then overrides with environment variables
-// It uses the common ConfigFactory for consistency
-type UnifiedConfigLoader struct {
-       fileLoader *Loader
-       envLoader  *EnvConfigLoader
-       factory    *ConfigFactory
-       logger     logger.Logger
-}
-
-// NewUnifiedConfigLoader creates a new unified configuration loader
-func NewUnifiedConfigLoader(cfgPath string) *UnifiedConfigLoader {
-       return &UnifiedConfigLoader{
-               fileLoader: New(cfgPath),
-               envLoader:  NewEnvConfigLoader(),
-               factory:    NewConfigFactory(),
-               logger:     logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo).WithName("unified-config-loader"),
-       }
-}
-
-// Load loads configuration from file and environment variables
-// Environment variables take precedence over file configuration
-func (l *UnifiedConfigLoader) Load() (*config.CollectorConfig, error) {
-       var cfg *config.CollectorConfig
-       var err error
-
-       // Try to load from file first
-       if l.fileLoader.cfgPath != "" {
-               if cfg, err = l.fileLoader.LoadConfig(); err != nil {
-                       l.logger.Error(err, "Failed to load configuration from 
file, falling back to environment variables only")
-                       cfg = nil
-               } else {
-                       l.logger.Info("Configuration loaded from file", "file", 
l.fileLoader.cfgPath)
-               }
-       }
-
-       // If file loading failed or no file specified, start with defaults
-       if cfg == nil {
-               cfg = l.factory.CreateDefaultConfig()
-               l.logger.Info("Using default configuration")
-       }
-
-       // Merge with environment variables (env takes precedence)
-       finalCfg := l.factory.MergeWithEnv(cfg)
-
-       // Validate the final configuration
-       if err := l.factory.ValidateConfig(finalCfg); err != nil {
-               l.logger.Error(err, "Configuration validation failed")
-               return nil, err
-       }
-
-       l.logger.Info("Unified configuration loaded successfully")
-       return finalCfg, nil
-}
-
-// LoadEnvOnly loads configuration from environment variables only
-func (l *UnifiedConfigLoader) LoadEnvOnly() *config.CollectorConfig {
-       cfg := l.envLoader.LoadFromEnv()
-       l.logger.Info("Configuration loaded from environment variables only")
-       return cfg
-}
-
-// LoadFileOnly loads configuration from file only (without env overrides)
-func (l *UnifiedConfigLoader) LoadFileOnly() (*config.CollectorConfig, error) {
-       if l.fileLoader.cfgPath == "" {
-               cfg := l.factory.CreateDefaultConfig()
-               l.logger.Info("No file specified, using default configuration")
-               return cfg, nil
-       }
-
-       return l.fileLoader.LoadConfig()
-}
-
-// ValidateConfig validates the configuration using the factory
-func (l *UnifiedConfigLoader) ValidateConfig(cfg *config.CollectorConfig) 
error {
-       return l.factory.ValidateConfig(cfg)
-}
-
-// PrintConfig prints the configuration using the factory
-func (l *UnifiedConfigLoader) PrintConfig(cfg *config.CollectorConfig) {
-       l.factory.PrintConfig(cfg)
-}
-
-// GetManagerAddress returns the manager address using the factory
-func (l *UnifiedConfigLoader) GetManagerAddress(cfg *config.CollectorConfig) 
string {
-       return l.factory.GetManagerAddress(cfg)
-}
diff --git a/internal/constants/const.go b/internal/constants/const.go
index d6ac8de..cd68a14 100644
--- a/internal/constants/const.go
+++ b/internal/constants/const.go
@@ -19,72 +19,76 @@
 // ported from org.apache.hertzbeat.common.constants.CommonConstants
 package constants
 
+const (
+  DefaultHertzBeatCollectorName = "hertzbeat-collector"
+)
+
 // Collect Response status code (Go Collector specific)
 const (
-       CollectSuccess       = 0 // Generic success
-       CollectUnavailable   = 1 // Service unavailable (e.g., target service 
error)
-       CollectUnReachable   = 2 // Network unreachable (e.g., network timeout, 
DNS fail)
-       CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH 
auth fail)
-       CollectFail          = 4 // Generic failure (e.g., query error, script 
exec fail)
-       CollectTimeout       = 5 // Collection timeout
+  CollectSuccess       = 0 // Generic success
+  CollectUnavailable   = 1 // Service unavailable (e.g., target service error)
+  CollectUnReachable   = 2 // Network unreachable (e.g., network timeout, DNS 
fail)
+  CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH auth 
fail)
+  CollectFail          = 4 // Generic failure (e.g., query error, script exec 
fail)
+  CollectTimeout       = 5 // Collection timeout
 )
 
 // Field parameter types (Aligned with CommonConstants.java)
 const (
-       TYPE_NUMBER = 0 // Field parameter type: number
-       TYPE_STRING = 1 // Field parameter type: String
-       TYPE_SECRET = 2 // Field parameter type: encrypted string
-       TYPE_TIME   = 3 // Field parameter type: time
+  TYPE_NUMBER = 0 // Field parameter type: number
+  TYPE_STRING = 1 // Field parameter type: String
+  TYPE_SECRET = 2 // Field parameter type: encrypted string
+  TYPE_TIME   = 3 // Field parameter type: time
 )
 
 // Monitoring status (Aligned with CommonConstants.java)
 const (
-       MONITOR_PAUSED_CODE = 0 // 0: Paused
-       MONITOR_UP_CODE     = 1 // 1: Up
-       MONITOR_DOWN_CODE   = 2 // 2: Down
+  MONITOR_PAUSED_CODE = 0 // 0: Paused
+  MONITOR_UP_CODE     = 1 // 1: Up
+  MONITOR_DOWN_CODE   = 2 // 2: Down
 )
 
 // Common metric/label keys
 const (
-       // Collection metric value: null placeholder for empty value
-       NULL_VALUE = "&nbsp;"
+  // Collection metric value: null placeholder for empty value
+  NULL_VALUE = "&nbsp;"
 
-       // Common metric keys
-       ErrorMsg      = "errorMsg"
-       RESPONSE_TIME = "responseTime"
-       StatusCode    = "statusCode"
+  // Common metric keys
+  ErrorMsg      = "errorMsg"
+  RESPONSE_TIME = "responseTime"
+  StatusCode    = "statusCode"
 
-       // Label keys (Aligned with CommonConstants.java)
-       LABEL_INSTANCE       = "instance"
-       LABEL_DEFINE_ID      = "defineid"
-       LABEL_ALERT_NAME     = "alertname"
-       LABEL_INSTANCE_HOST  = "instancehost"
-       LABEL_INSTANCE_NAME  = "instancename"
-       LABEL_ALERT_SEVERITY = "severity"
+  // Label keys (Aligned with CommonConstants.java)
+  LABEL_INSTANCE       = "instance"
+  LABEL_DEFINE_ID      = "defineid"
+  LABEL_ALERT_NAME     = "alertname"
+  LABEL_INSTANCE_HOST  = "instancehost"
+  LABEL_INSTANCE_NAME  = "instancename"
+  LABEL_ALERT_SEVERITY = "severity"
 )
 
 // Service specific constants
 const (
-       MongoDbAtlasModel = "mongodb-atlas"
+  MongoDbAtlasModel = "mongodb-atlas"
 
-       // PostgreSQLUnReachAbleCode Specific SQLState for connection failure
-       PostgreSQLUnReachAbleCode = "08001"
-       // ZookeeperApp App name for Zookeeper
-       ZookeeperApp = "zookeeper"
-       // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output
-       ZookeeperEnviHeader = "Environment:"
+  // PostgreSQLUnReachAbleCode Specific SQLState for connection failure
+  PostgreSQLUnReachAbleCode = "08001"
+  // ZookeeperApp App name for Zookeeper
+  ZookeeperApp = "zookeeper"
+  // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output
+  ZookeeperEnviHeader = "Environment:"
 )
 
 // Function related constants
 const (
-       CollectorModule = "collector"
+  CollectorModule = "collector"
 )
 
 // Legacy or alias constants
 // These are kept for compatibility with previous Go code logic
 const (
-       // FieldTypeString Alias for TYPE_STRING
-       FieldTypeString = TYPE_STRING
-       // KeyWord Deprecated placeholder
-       KeyWord = "keyword"
+  // FieldTypeString Alias for TYPE_STRING
+  FieldTypeString = TYPE_STRING
+  // KeyWord Deprecated placeholder
+  KeyWord = "keyword"
 )
diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index ebaeade..ccdd0d0 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -434,27 +434,28 @@ func (c *NettyClient) connectionMonitor() {
                if !connExists {
                        // Connection is nil, trigger disconnect event
                        c.triggerEvent(EventDisconnected, nil)
-                       log.Println("Connection lost, attempting to 
reconnect...")
+                       log.Println("Connection lost, starting reconnection 
loop...")
                        _ = c.Shutdown()
 
-                       // Attempt to reconnect with exponential backoff
-                       for i := 0; i < 5; i++ {
+                       // Attempt to reconnect continuously with fixed 
3-second interval
+                       attempt := 0
+                       for c.IsStarted() {
+                               attempt++
+                               log.Printf("Reconnection attempt #%d, will 
retry in 3 seconds...", attempt)
+                               time.Sleep(3 * time.Second)
+
                                if !c.IsStarted() {
-                                       break // Exit if shutdown was called
+                                       log.Println("Client shutdown requested, 
stopping reconnection attempts")
+                                       break
                                }
 
-                               backoff := time.Duration(i+1) * 2 * time.Second
-                               log.Printf("Attempting to reconnect in %v...", 
backoff)
-                               time.Sleep(backoff)
-
+                               log.Printf("Reconnection attempt #%d: trying to 
connect to %s...", attempt, c.addr)
                                if err := c.Start(); err == nil {
-                                       log.Println("Reconnected successfully")
+                                       log.Printf("Reconnection attempt #%d: 
SUCCESS - Connected to manager", attempt)
                                        break
                                } else {
-                                       log.Printf("Failed to reconnect: %v", 
err)
-                                       if i == 4 { // Last attempt
-                                               
c.triggerEvent(EventConnectFailed, err)
-                                       }
+                                       log.Printf("Reconnection attempt #%d: 
FAILED - %v", attempt, err)
+                                       c.triggerEvent(EventConnectFailed, err)
                                }
                        }
                }
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index d969f48..f989861 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -26,7 +26,7 @@ import (
 
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
+       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
@@ -87,7 +87,7 @@ func NewGoOnlineProcessor(client *GrpcClient) 
*GoOnlineProcessor {
 }
 
 func (p *GoOnlineProcessor) Process(msg *pb.Message) (*pb.Message, error) {
-       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("go-online-processor")
+       log := logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo).WithName("go-online-processor")
 
        // Handle go online message - parse ServerInfo and extract AES secret
        log.Info("received GO_ONLINE message from manager")
@@ -188,7 +188,7 @@ func NewCollectCyclicDataProcessor(client *GrpcClient, 
scheduler JobScheduler) *
 }
 
 func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
-       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("cyclic-task-processor")
+       log := logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo).WithName("cyclic-task-processor")
        log.Info("processing cyclic task message", "identity", msg.Identity)
 
        if p.scheduler == nil {
@@ -229,7 +229,7 @@ func (p *CollectCyclicDataProcessor) Process(msg 
*pb.Message) (*pb.Message, erro
                "monitorID", job.MonitorID,
                "app", job.App)
 
-       // Add job to scheduler
+       // Add job to scheduler - convert to interface{} for compatibility
        if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
                log.Error(err, "Failed to add cyclic job to scheduler: %v")
                return &pb.Message{
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
index 807630a..8d0c155 100644
--- a/internal/transport/transport.go
+++ b/internal/transport/transport.go
@@ -22,9 +22,9 @@ import (
        "encoding/json"
        "fmt"
        "os"
+       "time"
 
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
-       config2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
        clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
        configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
@@ -72,24 +72,6 @@ func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner 
{
        return New(cfg)
 }
 
-// NewFromEnv creates a new transport runner from environment variables
-func NewFromEnv() *Runner {
-       envLoader := config2.NewEnvConfigLoader()
-       cfg := envLoader.LoadFromEnv()
-       return NewFromConfig(cfg)
-}
-
-// NewFromUnifiedConfig creates a new transport runner using unified 
configuration loading
-// It loads from file first, then overrides with environment variables
-func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
-       unifiedLoader := config2.NewUnifiedConfigLoader(cfgPath)
-       cfg, err := unifiedLoader.Load()
-       if err != nil {
-               return nil, err
-       }
-       return NewFromConfig(cfg), nil
-}
-
 func (r *Runner) Start(ctx context.Context) error {
        // 初始化 Logger 如果它还没有被设置
        if r.Logger.IsZero() {
@@ -125,71 +107,99 @@ func (r *Runner) Start(ctx context.Context) error {
        factory := &TransportClientFactory{}
        client, err := factory.CreateClient(protocol, addr)
        if err != nil {
-               r.Logger.Error(err, "Failed to create transport client")
-               return err
-       }
-
-       // Set the identity on the client if it supports it
-       identity := r.Config.Collector.Identity
-       if identity == "" {
-               identity = DefaultIdentity
-       }
+               r.Logger.Error(err, "Failed to create transport client, will 
retry in background")
+               // 不直接返回错误,而是继续启动,后续会在后台重试连接
+       } else {
+               // Set the identity on the client if it supports it
+               identity := r.Config.Collector.Identity
+               if identity == "" {
+                       identity = DefaultIdentity
+               }
 
-       if nettyClient, ok := client.(*NettyClient); ok {
-               nettyClient.SetIdentity(identity)
-       }
+               if nettyClient, ok := client.(*NettyClient); ok {
+                       nettyClient.SetIdentity(identity)
+               }
 
-       r.client = client
-
-       // 设置事件处理器
-       switch c := client.(type) {
-       case *GrpcClient:
-               c.SetEventHandler(func(event Event) {
-                       switch event.Type {
-                       case EventConnected:
-                               r.Logger.Info("Connected to manager gRPC 
server", "addr", event.Address)
-                               go r.sendOnlineMessage()
-                       case EventDisconnected:
-                               r.Logger.Info("Disconnected from manager gRPC 
server", "addr", event.Address)
-                       case EventConnectFailed:
-                               r.Logger.Error(event.Error, "Failed to connect 
to manager gRPC server", "addr", event.Address)
+               r.client = client
+
+               // 设置事件处理器
+               switch c := client.(type) {
+               case *GrpcClient:
+                       c.SetEventHandler(func(event Event) {
+                               switch event.Type {
+                               case EventConnected:
+                                       r.Logger.Info("Connected to manager 
gRPC server", "addr", event.Address)
+                                       go r.sendOnlineMessage()
+                               case EventDisconnected:
+                                       r.Logger.Info("Disconnected from 
manager gRPC server", "addr", event.Address)
+                               case EventConnectFailed:
+                                       r.Logger.Error(event.Error, "Failed to 
connect to manager gRPC server, will retry", "addr", event.Address)
+                               }
+                       })
+                       // Register processors with job scheduler
+                       if r.jobScheduler != nil {
+                               RegisterDefaultProcessors(c, r.jobScheduler)
+                               r.Logger.Info("Registered gRPC processors with 
job scheduler")
+                       } else {
+                               RegisterDefaultProcessors(c, nil)
+                               r.Logger.Info("Registered gRPC processors 
without job scheduler")
                        }
-               })
-               // Register processors with job scheduler
-               if r.jobScheduler != nil {
-                       RegisterDefaultProcessors(c, r.jobScheduler)
-                       r.Logger.Info("Registered gRPC processors with job 
scheduler")
-               } else {
-                       RegisterDefaultProcessors(c, nil)
-                       r.Logger.Info("Registered gRPC processors without job 
scheduler")
-               }
-       case *NettyClient:
-               c.SetEventHandler(func(event Event) {
-                       switch event.Type {
-                       case EventConnected:
-                               r.Logger.Info("Connected to manager netty 
server", "addr", event.Address)
-                               go r.sendOnlineMessage()
-                       case EventDisconnected:
-                               r.Logger.Info("Disconnected from manager netty 
server", "addr", event.Address)
-                       case EventConnectFailed:
-                               r.Logger.Error(event.Error, "Failed to connect 
to manager netty server", "addr", event.Address)
+               case *NettyClient:
+                       c.SetEventHandler(func(event Event) {
+                               switch event.Type {
+                               case EventConnected:
+                                       r.Logger.Info("Connected to manager 
netty server", "addr", event.Address)
+                                       go r.sendOnlineMessage()
+                               case EventDisconnected:
+                                       r.Logger.Info("Disconnected from 
manager netty server", "addr", event.Address)
+                               case EventConnectFailed:
+                                       r.Logger.Error(event.Error, "Failed to 
connect to manager netty server, will retry", "addr", event.Address)
+                               }
+                       })
+                       // Register processors with job scheduler
+                       if r.jobScheduler != nil {
+                               RegisterDefaultNettyProcessors(c, 
r.jobScheduler)
+                               r.Logger.Info("Registered netty processors with 
job scheduler")
+                       } else {
+                               RegisterDefaultNettyProcessors(c, nil)
+                               r.Logger.Info("Registered netty processors 
without job scheduler")
                        }
-               })
-               // Register processors with job scheduler
-               if r.jobScheduler != nil {
-                       RegisterDefaultNettyProcessors(c, r.jobScheduler)
-                       r.Logger.Info("Registered netty processors with job 
scheduler")
-               } else {
-                       RegisterDefaultNettyProcessors(c, nil)
-                       r.Logger.Info("Registered netty processors without job 
scheduler")
                }
-       }
 
-       if err := r.client.Start(); err != nil {
-               r.Logger.Error(err, "Failed to start transport client")
-               return err
+               // 尝试启动客户端,如果失败则启动重连循环
+               if err := r.client.Start(); err != nil {
+                       r.Logger.Error(err, "Failed to start transport client 
on first attempt, starting retry loop")
+
+                       // 在后台启动重连循环
+                       go func() {
+                               attempt := 0
+                               for {
+                                       attempt++
+                                       r.Logger.Info("Reconnection attempt", 
"attempt", attempt, "wait", "3s")
+                                       time.Sleep(3 * time.Second)
+
+                                       // 检查是否应该停止
+                                       select {
+                                       case <-ctx.Done():
+                                               r.Logger.Info("Reconnection 
loop stopped due to context cancellation")
+                                               return
+                                       default:
+                                       }
+
+                                       r.Logger.Info("Trying to connect to 
manager", "attempt", attempt, "addr", addr)
+                                       if err := r.client.Start(); err == nil {
+                                               r.Logger.Info("Successfully 
connected to manager", "attempt", attempt)
+                                               return
+                                       } else {
+                                               r.Logger.Error(err, "Connection 
failed, will retry", "attempt", attempt)
+                                       }
+                               }
+                       }()
+               }
        }
 
+       r.Logger.Info("Transport runner started successfully, connection will 
be established in background")
+
        // 创建新的context用于监控关闭信号
        ctx, cancel := context.WithCancel(ctx)
        defer cancel()
@@ -198,7 +208,9 @@ func (r *Runner) Start(ctx context.Context) error {
        go func() {
                <-ctx.Done()
                r.Logger.Info("Shutting down transport client...")
-               _ = r.client.Shutdown()
+               if r.client != nil {
+                       _ = r.client.Shutdown()
+               }
        }()
 
        // 阻塞直到 ctx.Done
diff --git a/internal/types/config/config_types.go 
b/internal/types/config/config_types.go
index ee8d8c5..4398cce 100644
--- a/internal/types/config/config_types.go
+++ b/internal/types/config/config_types.go
@@ -25,23 +25,24 @@ type CollectorSection struct {
        Info     CollectorInfo      `yaml:"info"`
        Log      CollectorLogConfig `yaml:"log"`
        Manager  ManagerConfig      `yaml:"manager"`
-       Identity string             `yaml:"identity" env:"IDENTITY"`
-       Mode     string             `yaml:"mode" env:"MODE"`
-       // Add Dispatcher if needed
+       Identity string             `yaml:"identity"`
+       Mode     string             `yaml:"mode"`
+       // todo dispatcher
 }
 
 type CollectorInfo struct {
-       Name string `yaml:"name"`
        IP   string `yaml:"ip"`
+       Name string `yaml:"name"`
        Port string `yaml:"port"`
 }
 
 type CollectorLogConfig struct {
+       // Level is one of: debug, info, warn, error, panic, panic, fatal
        Level string `yaml:"level"`
 }
 
 type ManagerConfig struct {
-       Host     string `yaml:"host" env:"MANAGER_HOST"`
-       Port     string `yaml:"port" env:"MANAGER_PORT"`
-       Protocol string `yaml:"protocol" env:"MANAGER_PROTOCOL"`
+       Host     string `yaml:"host"`
+       Port     string `yaml:"port"`
+       Protocol string `yaml:"protocol"`
 }
diff --git a/internal/util/logger/logger.go b/internal/util/logger/logger.go
index cc657eb..8ef1456 100644
--- a/internal/util/logger/logger.go
+++ b/internal/util/logger/logger.go
@@ -25,6 +25,7 @@ import (
        "github.com/go-logr/zapr"
        "go.uber.org/zap"
        "go.uber.org/zap/zapcore"
+
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
 )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to