This is an automated email from the ASF dual-hosted git repository. shown pushed a commit to branch 1214-yuluo/cfg in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 7b2984eb5b5300d07709d26e1b1b99f2e602a599 Author: yuluo-yx <[email protected]> AuthorDate: Sun Dec 14 14:56:12 2025 +0800 feat: optimize cfg and binary start Signed-off-by: yuluo-yx <[email protected]> --- etc/hertzbeat-collector.yaml | 2 +- go.mod | 1 + go.sum | 11 +- internal/config/config.go | 278 +++++++++++++++++++++----- internal/config/config_factory.go | 353 ---------------------------------- internal/config/env_config.go | 84 -------- internal/config/unified_config.go | 114 ----------- internal/constants/const.go | 80 ++++---- internal/transport/netty_client.go | 27 +-- internal/transport/processors.go | 8 +- internal/transport/transport.go | 166 ++++++++-------- internal/types/config/config_types.go | 15 +- internal/util/logger/logger.go | 1 + 13 files changed, 395 insertions(+), 745 deletions(-) diff --git a/etc/hertzbeat-collector.yaml b/etc/hertzbeat-collector.yaml index e67a1cd..5e87865 100644 --- a/etc/hertzbeat-collector.yaml +++ b/etc/hertzbeat-collector.yaml @@ -22,7 +22,7 @@ collector: port: 8080 log: - level: debug + level: info # Transport/Manager configuration manager: diff --git a/go.mod b/go.mod index df2342e..d840a0e 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/alicebob/miniredis/v2 v2.35.0 github.com/apache/arrow/go/v13 v13.0.0 github.com/expr-lang/expr v1.16.9 + github.com/fsnotify/fsnotify v1.4.7 github.com/go-logr/logr v1.4.3 github.com/go-logr/zapr v1.3.0 github.com/go-sql-driver/mysql v1.9.3 diff --git a/go.sum b/go.sum index 5ba8df6..d52801f 100644 --- a/go.sum +++ b/go.sum @@ -28,11 +28,11 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -58,6 +58,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -66,12 +68,11 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gavv/httpexpect v2.0.0+incompatible/go.mod h1:x+9tiU1YnrOvnB725RkpoLv1M62hOWzwo5OXotisrKc= github.com/getsentry/sentry-go v0.12.0 h1:era7g0re5iY13bHSdN/xMkyV+5zZppjRVQhZrXCaEIk= @@ -256,11 +257,11 @@ github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2 github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/redis/go-redis/v9 v9.17.1 h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs= +github.com/redis/go-redis/v9 v9.17.1/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/redis/go-redis/v9 v9.17.1 h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs= -github.com/redis/go-redis/v9 v9.17.1/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= diff --git a/internal/config/config.go b/internal/config/config.go index a5a299f..0770e44 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -18,110 +18,290 @@ package config import ( + "context" "errors" + "fmt" "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + "github.com/fsnotify/fsnotify" "gopkg.in/yaml.v3" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + + "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + cfgtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" collectortypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err" loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) -const ( - DefaultHertzBeatCollectorName = "hertzbeat-collector" +var ( + globalConfig atomic.Pointer[cfgtypes.CollectorConfig] + configMu sync.RWMutex ) -// Loader handles file-based configuration loading -// It uses the common ConfigFactory for consistency +// Loader handles file-based configuration loading with hot-reload support type Loader struct { cfgPath string - factory *ConfigFactory logger logger.Logger } +// New creates a new configuration loader func New(cfgPath string) *Loader { + return &Loader{ cfgPath: cfgPath, - factory: NewConfigFactory(), + logger: logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo).WithName("config-loader"), } } -func (l *Loader) LoadConfig() (*config.CollectorConfig, error) { - l.logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo).WithName("config-loader") - +// LoadConfig loads configuration from file +func (l *Loader) LoadConfig() (*cfgtypes.CollectorConfig, error) { if l.cfgPath == "" { - l.logger.Info("file-config-loader: path is empty, using defaults") - return l.factory.CreateDefaultConfig(), nil + err := errors.New("config path is required") + l.logger.Error(err, "config path is empty") + return nil, err } - if _, err := os.Stat(l.cfgPath); os.IsNotExist(err) { - l.logger.Error(err, "file-config-loader: file not exist", "path", l.cfgPath) + cfg, err := l.parseConfigFile(l.cfgPath) + if err != nil { return nil, err } - file, err := os.Open(l.cfgPath) + l.logger.Info("configuration loaded successfully", "path", l.cfgPath) + return cfg, nil +} + +// parseConfigFile parses the YAML config file +func (l *Loader) parseConfigFile(path string) (*cfgtypes.CollectorConfig, error) { + // Resolve symlinks to handle Kubernetes ConfigMap mounts + resolved, err := filepath.EvalSymlinks(path) if err != nil { + resolved = path + } + + if _, err := os.Stat(resolved); os.IsNotExist(err) { + l.logger.Error(err, "config file not exist", "path", resolved) + return nil, err + } + + data, err := os.ReadFile(resolved) + if err != nil { + l.logger.Error(err, "failed to read config file", "path", resolved) return nil, err } - defer func(file *os.File) { - err := file.Close() - if err != nil { - l.logger.Error(err, "close config file failed") - } - }(file) - var cfg config.CollectorConfig - decoder := yaml.NewDecoder(file) - if err := decoder.Decode(&cfg); err != nil { - l.logger.Error(err, "decode config file failed") + var cfg cfgtypes.CollectorConfig + if err := yaml.Unmarshal(data, &cfg); err != nil { + l.logger.Error(err, "failed to parse config file", "path", resolved) return nil, err } - // Fill in any missing fields with defaults - filledCfg := l.factory.MergeWithEnv(&cfg) + // Apply default values + // when some fields are missing in config file + l.applyDefaults(&cfg) - l.logger.Info("Configuration loaded from file", "path", l.cfgPath) - return filledCfg, nil + return &cfg, nil } -func (l *Loader) ValidateConfig(cfg *config.CollectorConfig) error { - if l.factory != nil { - return l.factory.ValidateConfig(cfg) +// applyDefaults fills in missing configuration with defaults +func (l *Loader) applyDefaults(cfg *cfgtypes.CollectorConfig) { + if cfg.Collector.Info.Name == "" { + cfg.Collector.Info.Name = constants.DefaultHertzBeatCollectorName } + if cfg.Collector.Log.Level == "" { + cfg.Collector.Log.Level = string(loggertypes.LogLevelInfo) + } + if cfg.Collector.Manager.Protocol == "" { + cfg.Collector.Manager.Protocol = "http" + } +} - // Fallback validation if factory is not available +// ValidateConfig validates the configuration +func (l *Loader) ValidateConfig(cfg *cfgtypes.CollectorConfig) error { if cfg == nil { - l.logger.Error(collectortypes.CollectorConfigIsNil, "config-loader is nil") - return errors.New("config-loader is nil") + err := errors.New("config is nil") + l.logger.Error(collectortypes.CollectorConfigIsNil, "config validation failed") + return err } - // other check if cfg.Collector.Info.IP == "" { - l.logger.Error(collectortypes.CollectorIPIsNil, "collector ip is empty") - return errors.New("config-loader ip is empty") + err := errors.New("collector ip is empty") + l.logger.Error(collectortypes.CollectorIPIsNil, "config validation failed") + return err } if cfg.Collector.Info.Port == "" { - l.logger.Error(collectortypes.CollectorPortIsNil, "config-loader: port is empty") - return errors.New("config-loader port is empty") + err := errors.New("collector port is empty") + l.logger.Error(collectortypes.CollectorPortIsNil, "config validation failed") + return err } if cfg.Collector.Info.Name == "" { - l.logger.Sugar().Debug("config-loader: name is empty") - cfg.Collector.Info.Name = DefaultHertzBeatCollectorName + cfg.Collector.Info.Name = constants.DefaultHertzBeatCollectorName + l.logger.Sugar().Debug("collector name is empty, using default") } return nil } -// GetManagerAddress returns the full manager address using the factory -func (l *Loader) GetManagerAddress(cfg *config.CollectorConfig) string { - return l.factory.GetManagerAddress(cfg) +// GetManagerAddress returns the full manager address +func (l *Loader) GetManagerAddress(cfg *cfgtypes.CollectorConfig) string { + if cfg == nil || cfg.Collector.Manager.Host == "" { + return "" + } + protocol := cfg.Collector.Manager.Protocol + if protocol == "" { + protocol = "http" + } + return fmt.Sprintf("%s://%s:%s", protocol, cfg.Collector.Manager.Host, cfg.Collector.Manager.Port) +} + +// PrintConfig prints the configuration +func (l *Loader) PrintConfig(cfg *cfgtypes.CollectorConfig) { + if cfg == nil { + l.logger.Info("config is nil") + return + } + l.logger.Info("current configuration", + "name", cfg.Collector.Info.Name, + "ip", cfg.Collector.Info.IP, + "port", cfg.Collector.Info.Port, + "log_level", cfg.Collector.Log.Level, + "manager", l.GetManagerAddress(cfg), + ) +} + +// GetGlobalConfig returns the current global configuration +func GetGlobalConfig() *cfgtypes.CollectorConfig { + return globalConfig.Load() +} + +// SetGlobalConfig sets the global configuration +func SetGlobalConfig(cfg *cfgtypes.CollectorConfig) { + configMu.Lock() + defer configMu.Unlock() + globalConfig.Store(cfg) +} + +// WatchConfigAndReload watches the config file and reloads on changes +// This function implements hot-reload for both configuration and logging +func (l *Loader) WatchConfigAndReload(ctx context.Context) error { + watcher, err := fsnotify.NewWatcher() + if err != nil { + l.logger.Error(err, "failed to create config watcher") + return err + } + defer watcher.Close() + + // Watch both the file and its directory to handle symlink swaps (Kubernetes ConfigMap) + cfgFile := l.cfgPath + cfgDir := filepath.Dir(cfgFile) + + if err := watcher.Add(cfgDir); err != nil { + l.logger.Error(err, "failed to watch config directory", "dir", cfgDir) + return err + } + + // Try to watch the file directly (best-effort) + _ = watcher.Add(cfgFile) + + l.logger.Info("config file watcher started", "path", cfgFile) + + // Debounce events + var ( + pending bool + last time.Time + ) + + reload := func() { + l.logger.Info("config file changed, reloading...") + + // Parse new configuration + newCfg, err := l.parseConfigFile(cfgFile) + if err != nil { + l.logger.Error(err, "failed to reload config") + return + } + + // Validate new configuration + if err := l.ValidateConfig(newCfg); err != nil { + l.logger.Error(err, "invalid config after reload") + return + } + + // Update global configuration + SetGlobalConfig(newCfg) + + // Hot-reload logging configuration + if err := l.reloadLogging(newCfg); err != nil { + l.logger.Error(err, "failed to reload logging") + } + + l.logger.Info("configuration reloaded successfully") + } + + for { + select { + case <-ctx.Done(): + l.logger.Info("config watcher stopped") + return ctx.Err() + + case event, ok := <-watcher.Events: + if !ok { + return nil + } + + // Handle Write, Create, Remove, Rename, Chmod events + if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Remove|fsnotify.Rename|fsnotify.Chmod) != 0 { + // Check if the event pertains to the config file or directory + if filepath.Base(event.Name) == filepath.Base(cfgFile) || filepath.Dir(event.Name) == cfgDir { + // Debounce: if not pending or enough time has passed + if !pending || time.Since(last) > 250*time.Millisecond { + pending = true + last = time.Now() + // Slight delay to let file settle + go func() { + time.Sleep(300 * time.Millisecond) + reload() + pending = false + }() + } + } + } + + case err, ok := <-watcher.Errors: + if !ok { + return nil + } + l.logger.Error(err, "config watcher error") + } + } } -// PrintConfig prints the configuration using the factory -func (l *Loader) PrintConfig(cfg *config.CollectorConfig) { - l.factory.PrintConfig(cfg) +// reloadLogging reloads the logging configuration +// when config file changes, it helps dynamically +// adjust the log level for dynamic debugging +func (l *Loader) reloadLogging(cfg *cfgtypes.CollectorConfig) error { + + if cfg == nil { + return errors.New("config is nil") + } + + // Parse log level from config + level := loggertypes.LogLevel(cfg.Collector.Log.Level) + if level == "" { + level = loggertypes.LogLevelInfo + } + + // Create new logger with updated level + newLogger := logger.DefaultLogger(os.Stdout, level).WithName("config-loader") + + // Update loader's logger + l.logger = newLogger + + l.logger.Info("logging configuration reloaded", "level", level) + return nil } diff --git a/internal/config/config_factory.go b/internal/config/config_factory.go deleted file mode 100644 index 7854d4f..0000000 --- a/internal/config/config_factory.go +++ /dev/null @@ -1,353 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package config - -import ( - "fmt" - "os" - "strconv" - "strings" - - "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" -) - -// ConfigDefaults contains all default configuration values -type ConfigDefaults struct { - Identity string - Mode string - CollectorName string - CollectorIP string - CollectorPort string - ManagerHost string - ManagerPort string - ManagerProtocol string - LogLevel string -} - -// GetDefaultConfig returns the default configuration values -func GetDefaultConfig() *ConfigDefaults { - return &ConfigDefaults{ - Identity: "hertzbeat-collector-go", - Mode: "public", - CollectorName: "hertzbeat-collector-go", - CollectorIP: "127.0.0.1", - CollectorPort: "8080", - ManagerHost: "127.0.0.1", - ManagerPort: "1158", - ManagerProtocol: "netty", - LogLevel: "info", - } -} - -// ConfigFactory provides factory methods for creating configurations -type ConfigFactory struct { - defaults *ConfigDefaults - logger logger.Logger -} - -// NewConfigFactory creates a new configuration factory -func NewConfigFactory() *ConfigFactory { - return &ConfigFactory{ - defaults: GetDefaultConfig(), - logger: logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo).WithName("config-factory"), - } -} - -// CreateDefaultConfig creates a configuration with default values -func (f *ConfigFactory) CreateDefaultConfig() *config.CollectorConfig { - return &config.CollectorConfig{ - Collector: config.CollectorSection{ - Info: config.CollectorInfo{ - Name: f.defaults.CollectorName, - IP: f.defaults.CollectorIP, - Port: f.defaults.CollectorPort, - }, - Log: config.CollectorLogConfig{ - Level: f.defaults.LogLevel, - }, - Manager: config.ManagerConfig{ - Host: f.defaults.ManagerHost, - Port: f.defaults.ManagerPort, - Protocol: f.defaults.ManagerProtocol, - }, - Identity: f.defaults.Identity, - Mode: f.defaults.Mode, - }, - } -} - -// CreateFromEnv creates configuration from environment variables with fallback to defaults -func (f *ConfigFactory) CreateFromEnv() *config.CollectorConfig { - cfg := f.CreateDefaultConfig() - - // Override with environment variables if they exist - if identity := os.Getenv("IDENTITY"); identity != "" { - cfg.Collector.Identity = identity - } - - if mode := os.Getenv("MODE"); mode != "" { - cfg.Collector.Mode = mode - } - - if name := os.Getenv("COLLECTOR_NAME"); name != "" { - cfg.Collector.Info.Name = name - } - - if ip := os.Getenv("COLLECTOR_IP"); ip != "" { - cfg.Collector.Info.IP = ip - } - - if port := os.Getenv("COLLECTOR_PORT"); port != "" { - cfg.Collector.Info.Port = port - } - - if host := os.Getenv("MANAGER_HOST"); host != "" { - cfg.Collector.Manager.Host = host - } - - if port := os.Getenv("MANAGER_PORT"); port != "" { - cfg.Collector.Manager.Port = port - } - - if protocol := os.Getenv("MANAGER_PROTOCOL"); protocol != "" { - cfg.Collector.Manager.Protocol = protocol - } - - if level := os.Getenv("LOG_LEVEL"); level != "" { - cfg.Collector.Log.Level = level - } - - f.logger.Info("Configuration created from environment variables", - "identity", cfg.Collector.Identity, - "mode", cfg.Collector.Mode, - "manager", fmt.Sprintf("%s:%s", cfg.Collector.Manager.Host, cfg.Collector.Manager.Port)) - - return cfg -} - -// MergeWithEnv merges file configuration with environment variable overrides -func (f *ConfigFactory) MergeWithEnv(fileCfg *config.CollectorConfig) *config.CollectorConfig { - if fileCfg == nil { - return f.CreateFromEnv() - } - - // Start with file configuration - cfg := *fileCfg - - // Override with environment variables if they exist - if identity := os.Getenv("IDENTITY"); identity != "" { - cfg.Collector.Identity = identity - } - - if mode := os.Getenv("MODE"); mode != "" { - cfg.Collector.Mode = mode - } - - if name := os.Getenv("COLLECTOR_NAME"); name != "" { - cfg.Collector.Info.Name = name - } - - if ip := os.Getenv("COLLECTOR_IP"); ip != "" { - cfg.Collector.Info.IP = ip - } - - if port := os.Getenv("COLLECTOR_PORT"); port != "" { - cfg.Collector.Info.Port = port - } - - if host := os.Getenv("MANAGER_HOST"); host != "" { - cfg.Collector.Manager.Host = host - } - - if port := os.Getenv("MANAGER_PORT"); port != "" { - cfg.Collector.Manager.Port = port - } - - if protocol := os.Getenv("MANAGER_PROTOCOL"); protocol != "" { - cfg.Collector.Manager.Protocol = protocol - } - - if level := os.Getenv("LOG_LEVEL"); level != "" { - cfg.Collector.Log.Level = level - } - - // Fill in defaults for any empty fields - f.fillDefaults(&cfg) - - f.logger.Info("Configuration merged with environment variables") - return &cfg -} - -// fillDefaults fills in default values for any empty fields -func (f *ConfigFactory) fillDefaults(cfg *config.CollectorConfig) { - if cfg.Collector.Identity == "" { - cfg.Collector.Identity = f.defaults.Identity - } - - if cfg.Collector.Mode == "" { - cfg.Collector.Mode = f.defaults.Mode - } - - if cfg.Collector.Info.Name == "" { - cfg.Collector.Info.Name = f.defaults.CollectorName - } - - if cfg.Collector.Info.IP == "" { - cfg.Collector.Info.IP = f.defaults.CollectorIP - } - - if cfg.Collector.Info.Port == "" { - cfg.Collector.Info.Port = f.defaults.CollectorPort - } - - if cfg.Collector.Manager.Host == "" { - cfg.Collector.Manager.Host = f.defaults.ManagerHost - } - - if cfg.Collector.Manager.Port == "" { - cfg.Collector.Manager.Port = f.defaults.ManagerPort - } - - if cfg.Collector.Manager.Protocol == "" { - cfg.Collector.Manager.Protocol = f.defaults.ManagerProtocol - } - - if cfg.Collector.Log.Level == "" { - cfg.Collector.Log.Level = f.defaults.LogLevel - } -} - -// ValidateConfig validates the configuration -func (f *ConfigFactory) ValidateConfig(cfg *config.CollectorConfig) error { - if cfg == nil { - return fmt.Errorf("configuration is nil") - } - - // Validate required fields - if cfg.Collector.Identity == "" { - return fmt.Errorf("collector identity is required") - } - - if cfg.Collector.Mode == "" { - return fmt.Errorf("collector mode is required") - } - - if cfg.Collector.Manager.Host == "" { - return fmt.Errorf("manager host is required") - } - - if cfg.Collector.Manager.Port == "" { - return fmt.Errorf("manager port is required") - } - - // Validate mode - validModes := map[string]bool{ - "public": true, - "private": true, - } - if !validModes[strings.ToLower(cfg.Collector.Mode)] { - return fmt.Errorf("invalid mode: %s, must be 'public' or 'private'", cfg.Collector.Mode) - } - - // Validate protocol - validProtocols := map[string]bool{ - "netty": true, - "grpc": true, - } - if cfg.Collector.Manager.Protocol != "" && !validProtocols[strings.ToLower(cfg.Collector.Manager.Protocol)] { - return fmt.Errorf("invalid protocol: %s, must be 'netty' or 'grpc'", cfg.Collector.Manager.Protocol) - } - - // Validate port numbers - if _, err := strconv.Atoi(cfg.Collector.Info.Port); err != nil { - return fmt.Errorf("invalid collector port: %s", cfg.Collector.Info.Port) - } - - if _, err := strconv.Atoi(cfg.Collector.Manager.Port); err != nil { - return fmt.Errorf("invalid manager port: %s", cfg.Collector.Manager.Port) - } - - f.logger.Info("Configuration validation passed") - return nil -} - -// GetManagerAddress returns the full manager address (host:port) -func (f *ConfigFactory) GetManagerAddress(cfg *config.CollectorConfig) string { - if cfg == nil || cfg.Collector.Manager.Host == "" || cfg.Collector.Manager.Port == "" { - return fmt.Sprintf("%s:%s", f.defaults.ManagerHost, f.defaults.ManagerPort) - } - return fmt.Sprintf("%s:%s", cfg.Collector.Manager.Host, cfg.Collector.Manager.Port) -} - -// IsPublicMode checks if the collector is running in public mode -func (f *ConfigFactory) IsPublicMode(cfg *config.CollectorConfig) bool { - if cfg == nil { - return strings.ToLower(f.defaults.Mode) == "public" - } - return strings.ToLower(cfg.Collector.Mode) == "public" -} - -// IsPrivateMode checks if the collector is running in private mode -func (f *ConfigFactory) IsPrivateMode(cfg *config.CollectorConfig) bool { - if cfg == nil { - return strings.ToLower(f.defaults.Mode) == "private" - } - return strings.ToLower(cfg.Collector.Mode) == "private" -} - -// PrintConfig prints the configuration in a readable format -func (f *ConfigFactory) PrintConfig(cfg *config.CollectorConfig) { - if cfg == nil { - f.logger.Error(fmt.Errorf("configuration is nil"), "Configuration is nil") - return - } - - f.logger.Info("=== Collector Configuration ===") - f.logger.Info("Identity", "value", cfg.Collector.Identity) - f.logger.Info("Mode", "value", cfg.Collector.Mode) - f.logger.Info("Collector Name", "value", cfg.Collector.Info.Name) - f.logger.Info("Collector Address", "value", fmt.Sprintf("%s:%s", cfg.Collector.Info.IP, cfg.Collector.Info.Port)) - f.logger.Info("Manager Address", "value", fmt.Sprintf("%s:%s", cfg.Collector.Manager.Host, cfg.Collector.Manager.Port)) - f.logger.Info("Manager Protocol", "value", cfg.Collector.Manager.Protocol) - f.logger.Info("Log Level", "value", cfg.Collector.Log.Level) - f.logger.Info("===============================") -} - -// === Configuration Entry Points === - -// LoadFromFile loads configuration from file only -func LoadFromFile(cfgPath string) (*config.CollectorConfig, error) { - loader := New(cfgPath) - return loader.LoadConfig() -} - -// LoadFromEnv loads configuration from environment variables only -func LoadFromEnv() *config.CollectorConfig { - envLoader := NewEnvConfigLoader() - return envLoader.LoadFromEnv() -} - -// LoadUnified loads configuration with file + env priority (recommended) -func LoadUnified(cfgPath string) (*config.CollectorConfig, error) { - unifiedLoader := NewUnifiedConfigLoader(cfgPath) - return unifiedLoader.Load() -} diff --git a/internal/config/env_config.go b/internal/config/env_config.go deleted file mode 100644 index 40c09c1..0000000 --- a/internal/config/env_config.go +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package config - -import ( - "os" - - "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" -) - -// EnvConfigLoader handles environment variable configuration -// It uses the common ConfigFactory for consistency -type EnvConfigLoader struct { - factory *ConfigFactory - logger logger.Logger -} - -// NewEnvConfigLoader creates a new environment variable configuration loader -func NewEnvConfigLoader() *EnvConfigLoader { - return &EnvConfigLoader{ - factory: NewConfigFactory(), - logger: logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo).WithName("env-config-loader"), - } -} - -// LoadFromEnv loads configuration from environment variables with defaults -func (l *EnvConfigLoader) LoadFromEnv() *config.CollectorConfig { - cfg := l.factory.CreateFromEnv() - l.logger.Info("Configuration loaded from environment variables") - return cfg -} - -// LoadWithDefaults loads configuration with environment variables overriding provided defaults -func (l *EnvConfigLoader) LoadWithDefaults(defaultCfg *config.CollectorConfig) *config.CollectorConfig { - if defaultCfg == nil { - return l.LoadFromEnv() - } - - cfg := l.factory.MergeWithEnv(defaultCfg) - l.logger.Info("Configuration loaded with environment variable overrides") - return cfg -} - -// ValidateEnvConfig validates the environment configuration -func (l *EnvConfigLoader) ValidateEnvConfig(cfg *config.CollectorConfig) error { - return l.factory.ValidateConfig(cfg) -} - -// PrintEnvConfig prints the current environment configuration -func (l *EnvConfigLoader) PrintEnvConfig(cfg *config.CollectorConfig) { - l.factory.PrintConfig(cfg) -} - -// GetManagerAddress returns the full manager address (host:port) -func (l *EnvConfigLoader) GetManagerAddress(cfg *config.CollectorConfig) string { - return l.factory.GetManagerAddress(cfg) -} - -// IsPublicMode checks if the collector is running in public mode -func (l *EnvConfigLoader) IsPublicMode(cfg *config.CollectorConfig) bool { - return l.factory.IsPublicMode(cfg) -} - -// IsPrivateMode checks if the collector is running in private mode -func (l *EnvConfigLoader) IsPrivateMode(cfg *config.CollectorConfig) bool { - return l.factory.IsPrivateMode(cfg) -} diff --git a/internal/config/unified_config.go b/internal/config/unified_config.go deleted file mode 100644 index 2640448..0000000 --- a/internal/config/unified_config.go +++ /dev/null @@ -1,114 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package config - -import ( - "os" - - "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" -) - -// UnifiedConfigLoader provides a unified configuration loading interface -// that loads from file first, then overrides with environment variables -// It uses the common ConfigFactory for consistency -type UnifiedConfigLoader struct { - fileLoader *Loader - envLoader *EnvConfigLoader - factory *ConfigFactory - logger logger.Logger -} - -// NewUnifiedConfigLoader creates a new unified configuration loader -func NewUnifiedConfigLoader(cfgPath string) *UnifiedConfigLoader { - return &UnifiedConfigLoader{ - fileLoader: New(cfgPath), - envLoader: NewEnvConfigLoader(), - factory: NewConfigFactory(), - logger: logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo).WithName("unified-config-loader"), - } -} - -// Load loads configuration from file and environment variables -// Environment variables take precedence over file configuration -func (l *UnifiedConfigLoader) Load() (*config.CollectorConfig, error) { - var cfg *config.CollectorConfig - var err error - - // Try to load from file first - if l.fileLoader.cfgPath != "" { - if cfg, err = l.fileLoader.LoadConfig(); err != nil { - l.logger.Error(err, "Failed to load configuration from file, falling back to environment variables only") - cfg = nil - } else { - l.logger.Info("Configuration loaded from file", "file", l.fileLoader.cfgPath) - } - } - - // If file loading failed or no file specified, start with defaults - if cfg == nil { - cfg = l.factory.CreateDefaultConfig() - l.logger.Info("Using default configuration") - } - - // Merge with environment variables (env takes precedence) - finalCfg := l.factory.MergeWithEnv(cfg) - - // Validate the final configuration - if err := l.factory.ValidateConfig(finalCfg); err != nil { - l.logger.Error(err, "Configuration validation failed") - return nil, err - } - - l.logger.Info("Unified configuration loaded successfully") - return finalCfg, nil -} - -// LoadEnvOnly loads configuration from environment variables only -func (l *UnifiedConfigLoader) LoadEnvOnly() *config.CollectorConfig { - cfg := l.envLoader.LoadFromEnv() - l.logger.Info("Configuration loaded from environment variables only") - return cfg -} - -// LoadFileOnly loads configuration from file only (without env overrides) -func (l *UnifiedConfigLoader) LoadFileOnly() (*config.CollectorConfig, error) { - if l.fileLoader.cfgPath == "" { - cfg := l.factory.CreateDefaultConfig() - l.logger.Info("No file specified, using default configuration") - return cfg, nil - } - - return l.fileLoader.LoadConfig() -} - -// ValidateConfig validates the configuration using the factory -func (l *UnifiedConfigLoader) ValidateConfig(cfg *config.CollectorConfig) error { - return l.factory.ValidateConfig(cfg) -} - -// PrintConfig prints the configuration using the factory -func (l *UnifiedConfigLoader) PrintConfig(cfg *config.CollectorConfig) { - l.factory.PrintConfig(cfg) -} - -// GetManagerAddress returns the manager address using the factory -func (l *UnifiedConfigLoader) GetManagerAddress(cfg *config.CollectorConfig) string { - return l.factory.GetManagerAddress(cfg) -} diff --git a/internal/constants/const.go b/internal/constants/const.go index d6ac8de..cd68a14 100644 --- a/internal/constants/const.go +++ b/internal/constants/const.go @@ -19,72 +19,76 @@ // ported from org.apache.hertzbeat.common.constants.CommonConstants package constants +const ( + DefaultHertzBeatCollectorName = "hertzbeat-collector" +) + // Collect Response status code (Go Collector specific) const ( - CollectSuccess = 0 // Generic success - CollectUnavailable = 1 // Service unavailable (e.g., target service error) - CollectUnReachable = 2 // Network unreachable (e.g., network timeout, DNS fail) - CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH auth fail) - CollectFail = 4 // Generic failure (e.g., query error, script exec fail) - CollectTimeout = 5 // Collection timeout + CollectSuccess = 0 // Generic success + CollectUnavailable = 1 // Service unavailable (e.g., target service error) + CollectUnReachable = 2 // Network unreachable (e.g., network timeout, DNS fail) + CollectUnConnectable = 3 // Connection failed (e.g., port closed, SSH auth fail) + CollectFail = 4 // Generic failure (e.g., query error, script exec fail) + CollectTimeout = 5 // Collection timeout ) // Field parameter types (Aligned with CommonConstants.java) const ( - TYPE_NUMBER = 0 // Field parameter type: number - TYPE_STRING = 1 // Field parameter type: String - TYPE_SECRET = 2 // Field parameter type: encrypted string - TYPE_TIME = 3 // Field parameter type: time + TYPE_NUMBER = 0 // Field parameter type: number + TYPE_STRING = 1 // Field parameter type: String + TYPE_SECRET = 2 // Field parameter type: encrypted string + TYPE_TIME = 3 // Field parameter type: time ) // Monitoring status (Aligned with CommonConstants.java) const ( - MONITOR_PAUSED_CODE = 0 // 0: Paused - MONITOR_UP_CODE = 1 // 1: Up - MONITOR_DOWN_CODE = 2 // 2: Down + MONITOR_PAUSED_CODE = 0 // 0: Paused + MONITOR_UP_CODE = 1 // 1: Up + MONITOR_DOWN_CODE = 2 // 2: Down ) // Common metric/label keys const ( - // Collection metric value: null placeholder for empty value - NULL_VALUE = " " + // Collection metric value: null placeholder for empty value + NULL_VALUE = " " - // Common metric keys - ErrorMsg = "errorMsg" - RESPONSE_TIME = "responseTime" - StatusCode = "statusCode" + // Common metric keys + ErrorMsg = "errorMsg" + RESPONSE_TIME = "responseTime" + StatusCode = "statusCode" - // Label keys (Aligned with CommonConstants.java) - LABEL_INSTANCE = "instance" - LABEL_DEFINE_ID = "defineid" - LABEL_ALERT_NAME = "alertname" - LABEL_INSTANCE_HOST = "instancehost" - LABEL_INSTANCE_NAME = "instancename" - LABEL_ALERT_SEVERITY = "severity" + // Label keys (Aligned with CommonConstants.java) + LABEL_INSTANCE = "instance" + LABEL_DEFINE_ID = "defineid" + LABEL_ALERT_NAME = "alertname" + LABEL_INSTANCE_HOST = "instancehost" + LABEL_INSTANCE_NAME = "instancename" + LABEL_ALERT_SEVERITY = "severity" ) // Service specific constants const ( - MongoDbAtlasModel = "mongodb-atlas" + MongoDbAtlasModel = "mongodb-atlas" - // PostgreSQLUnReachAbleCode Specific SQLState for connection failure - PostgreSQLUnReachAbleCode = "08001" - // ZookeeperApp App name for Zookeeper - ZookeeperApp = "zookeeper" - // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output - ZookeeperEnviHeader = "Environment:" + // PostgreSQLUnReachAbleCode Specific SQLState for connection failure + PostgreSQLUnReachAbleCode = "08001" + // ZookeeperApp App name for Zookeeper + ZookeeperApp = "zookeeper" + // ZookeeperEnviHeader Header string in Zookeeper 'envi' command output + ZookeeperEnviHeader = "Environment:" ) // Function related constants const ( - CollectorModule = "collector" + CollectorModule = "collector" ) // Legacy or alias constants // These are kept for compatibility with previous Go code logic const ( - // FieldTypeString Alias for TYPE_STRING - FieldTypeString = TYPE_STRING - // KeyWord Deprecated placeholder - KeyWord = "keyword" + // FieldTypeString Alias for TYPE_STRING + FieldTypeString = TYPE_STRING + // KeyWord Deprecated placeholder + KeyWord = "keyword" ) diff --git a/internal/transport/netty_client.go b/internal/transport/netty_client.go index ebaeade..ccdd0d0 100644 --- a/internal/transport/netty_client.go +++ b/internal/transport/netty_client.go @@ -434,27 +434,28 @@ func (c *NettyClient) connectionMonitor() { if !connExists { // Connection is nil, trigger disconnect event c.triggerEvent(EventDisconnected, nil) - log.Println("Connection lost, attempting to reconnect...") + log.Println("Connection lost, starting reconnection loop...") _ = c.Shutdown() - // Attempt to reconnect with exponential backoff - for i := 0; i < 5; i++ { + // Attempt to reconnect continuously with fixed 3-second interval + attempt := 0 + for c.IsStarted() { + attempt++ + log.Printf("Reconnection attempt #%d, will retry in 3 seconds...", attempt) + time.Sleep(3 * time.Second) + if !c.IsStarted() { - break // Exit if shutdown was called + log.Println("Client shutdown requested, stopping reconnection attempts") + break } - backoff := time.Duration(i+1) * 2 * time.Second - log.Printf("Attempting to reconnect in %v...", backoff) - time.Sleep(backoff) - + log.Printf("Reconnection attempt #%d: trying to connect to %s...", attempt, c.addr) if err := c.Start(); err == nil { - log.Println("Reconnected successfully") + log.Printf("Reconnection attempt #%d: SUCCESS - Connected to manager", attempt) break } else { - log.Printf("Failed to reconnect: %v", err) - if i == 4 { // Last attempt - c.triggerEvent(EventConnectFailed, err) - } + log.Printf("Reconnection attempt #%d: FAILED - %v", attempt, err) + c.triggerEvent(EventConnectFailed, err) } } } diff --git a/internal/transport/processors.go b/internal/transport/processors.go index d969f48..f989861 100644 --- a/internal/transport/processors.go +++ b/internal/transport/processors.go @@ -26,7 +26,7 @@ import ( pb "hertzbeat.apache.org/hertzbeat-collector-go/api" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" @@ -87,7 +87,7 @@ func NewGoOnlineProcessor(client *GrpcClient) *GoOnlineProcessor { } func (p *GoOnlineProcessor) Process(msg *pb.Message) (*pb.Message, error) { - log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelInfo).WithName("go-online-processor") + log := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo).WithName("go-online-processor") // Handle go online message - parse ServerInfo and extract AES secret log.Info("received GO_ONLINE message from manager") @@ -188,7 +188,7 @@ func NewCollectCyclicDataProcessor(client *GrpcClient, scheduler JobScheduler) * } func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message, error) { - log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelInfo).WithName("cyclic-task-processor") + log := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo).WithName("cyclic-task-processor") log.Info("processing cyclic task message", "identity", msg.Identity) if p.scheduler == nil { @@ -229,7 +229,7 @@ func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message, erro "monitorID", job.MonitorID, "app", job.App) - // Add job to scheduler + // Add job to scheduler - convert to interface{} for compatibility if err := p.scheduler.AddAsyncCollectJob(&job); err != nil { log.Error(err, "Failed to add cyclic job to scheduler: %v") return &pb.Message{ diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 807630a..8d0c155 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -22,9 +22,9 @@ import ( "encoding/json" "fmt" "os" + "time" pb "hertzbeat.apache.org/hertzbeat-collector-go/api" - config2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/config" clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector" configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" @@ -72,24 +72,6 @@ func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner { return New(cfg) } -// NewFromEnv creates a new transport runner from environment variables -func NewFromEnv() *Runner { - envLoader := config2.NewEnvConfigLoader() - cfg := envLoader.LoadFromEnv() - return NewFromConfig(cfg) -} - -// NewFromUnifiedConfig creates a new transport runner using unified configuration loading -// It loads from file first, then overrides with environment variables -func NewFromUnifiedConfig(cfgPath string) (*Runner, error) { - unifiedLoader := config2.NewUnifiedConfigLoader(cfgPath) - cfg, err := unifiedLoader.Load() - if err != nil { - return nil, err - } - return NewFromConfig(cfg), nil -} - func (r *Runner) Start(ctx context.Context) error { // 初始化 Logger 如果它还没有被设置 if r.Logger.IsZero() { @@ -125,71 +107,99 @@ func (r *Runner) Start(ctx context.Context) error { factory := &TransportClientFactory{} client, err := factory.CreateClient(protocol, addr) if err != nil { - r.Logger.Error(err, "Failed to create transport client") - return err - } - - // Set the identity on the client if it supports it - identity := r.Config.Collector.Identity - if identity == "" { - identity = DefaultIdentity - } + r.Logger.Error(err, "Failed to create transport client, will retry in background") + // 不直接返回错误,而是继续启动,后续会在后台重试连接 + } else { + // Set the identity on the client if it supports it + identity := r.Config.Collector.Identity + if identity == "" { + identity = DefaultIdentity + } - if nettyClient, ok := client.(*NettyClient); ok { - nettyClient.SetIdentity(identity) - } + if nettyClient, ok := client.(*NettyClient); ok { + nettyClient.SetIdentity(identity) + } - r.client = client - - // 设置事件处理器 - switch c := client.(type) { - case *GrpcClient: - c.SetEventHandler(func(event Event) { - switch event.Type { - case EventConnected: - r.Logger.Info("Connected to manager gRPC server", "addr", event.Address) - go r.sendOnlineMessage() - case EventDisconnected: - r.Logger.Info("Disconnected from manager gRPC server", "addr", event.Address) - case EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager gRPC server", "addr", event.Address) + r.client = client + + // 设置事件处理器 + switch c := client.(type) { + case *GrpcClient: + c.SetEventHandler(func(event Event) { + switch event.Type { + case EventConnected: + r.Logger.Info("Connected to manager gRPC server", "addr", event.Address) + go r.sendOnlineMessage() + case EventDisconnected: + r.Logger.Info("Disconnected from manager gRPC server", "addr", event.Address) + case EventConnectFailed: + r.Logger.Error(event.Error, "Failed to connect to manager gRPC server, will retry", "addr", event.Address) + } + }) + // Register processors with job scheduler + if r.jobScheduler != nil { + RegisterDefaultProcessors(c, r.jobScheduler) + r.Logger.Info("Registered gRPC processors with job scheduler") + } else { + RegisterDefaultProcessors(c, nil) + r.Logger.Info("Registered gRPC processors without job scheduler") } - }) - // Register processors with job scheduler - if r.jobScheduler != nil { - RegisterDefaultProcessors(c, r.jobScheduler) - r.Logger.Info("Registered gRPC processors with job scheduler") - } else { - RegisterDefaultProcessors(c, nil) - r.Logger.Info("Registered gRPC processors without job scheduler") - } - case *NettyClient: - c.SetEventHandler(func(event Event) { - switch event.Type { - case EventConnected: - r.Logger.Info("Connected to manager netty server", "addr", event.Address) - go r.sendOnlineMessage() - case EventDisconnected: - r.Logger.Info("Disconnected from manager netty server", "addr", event.Address) - case EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager netty server", "addr", event.Address) + case *NettyClient: + c.SetEventHandler(func(event Event) { + switch event.Type { + case EventConnected: + r.Logger.Info("Connected to manager netty server", "addr", event.Address) + go r.sendOnlineMessage() + case EventDisconnected: + r.Logger.Info("Disconnected from manager netty server", "addr", event.Address) + case EventConnectFailed: + r.Logger.Error(event.Error, "Failed to connect to manager netty server, will retry", "addr", event.Address) + } + }) + // Register processors with job scheduler + if r.jobScheduler != nil { + RegisterDefaultNettyProcessors(c, r.jobScheduler) + r.Logger.Info("Registered netty processors with job scheduler") + } else { + RegisterDefaultNettyProcessors(c, nil) + r.Logger.Info("Registered netty processors without job scheduler") } - }) - // Register processors with job scheduler - if r.jobScheduler != nil { - RegisterDefaultNettyProcessors(c, r.jobScheduler) - r.Logger.Info("Registered netty processors with job scheduler") - } else { - RegisterDefaultNettyProcessors(c, nil) - r.Logger.Info("Registered netty processors without job scheduler") } - } - if err := r.client.Start(); err != nil { - r.Logger.Error(err, "Failed to start transport client") - return err + // 尝试启动客户端,如果失败则启动重连循环 + if err := r.client.Start(); err != nil { + r.Logger.Error(err, "Failed to start transport client on first attempt, starting retry loop") + + // 在后台启动重连循环 + go func() { + attempt := 0 + for { + attempt++ + r.Logger.Info("Reconnection attempt", "attempt", attempt, "wait", "3s") + time.Sleep(3 * time.Second) + + // 检查是否应该停止 + select { + case <-ctx.Done(): + r.Logger.Info("Reconnection loop stopped due to context cancellation") + return + default: + } + + r.Logger.Info("Trying to connect to manager", "attempt", attempt, "addr", addr) + if err := r.client.Start(); err == nil { + r.Logger.Info("Successfully connected to manager", "attempt", attempt) + return + } else { + r.Logger.Error(err, "Connection failed, will retry", "attempt", attempt) + } + } + }() + } } + r.Logger.Info("Transport runner started successfully, connection will be established in background") + // 创建新的context用于监控关闭信号 ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -198,7 +208,9 @@ func (r *Runner) Start(ctx context.Context) error { go func() { <-ctx.Done() r.Logger.Info("Shutting down transport client...") - _ = r.client.Shutdown() + if r.client != nil { + _ = r.client.Shutdown() + } }() // 阻塞直到 ctx.Done diff --git a/internal/types/config/config_types.go b/internal/types/config/config_types.go index ee8d8c5..4398cce 100644 --- a/internal/types/config/config_types.go +++ b/internal/types/config/config_types.go @@ -25,23 +25,24 @@ type CollectorSection struct { Info CollectorInfo `yaml:"info"` Log CollectorLogConfig `yaml:"log"` Manager ManagerConfig `yaml:"manager"` - Identity string `yaml:"identity" env:"IDENTITY"` - Mode string `yaml:"mode" env:"MODE"` - // Add Dispatcher if needed + Identity string `yaml:"identity"` + Mode string `yaml:"mode"` + // todo dispatcher } type CollectorInfo struct { - Name string `yaml:"name"` IP string `yaml:"ip"` + Name string `yaml:"name"` Port string `yaml:"port"` } type CollectorLogConfig struct { + // Level is one of: debug, info, warn, error, panic, panic, fatal Level string `yaml:"level"` } type ManagerConfig struct { - Host string `yaml:"host" env:"MANAGER_HOST"` - Port string `yaml:"port" env:"MANAGER_PORT"` - Protocol string `yaml:"protocol" env:"MANAGER_PROTOCOL"` + Host string `yaml:"host"` + Port string `yaml:"port"` + Protocol string `yaml:"protocol"` } diff --git a/internal/util/logger/logger.go b/internal/util/logger/logger.go index cc657eb..8ef1456 100644 --- a/internal/util/logger/logger.go +++ b/internal/util/logger/logger.go @@ -25,6 +25,7 @@ import ( "github.com/go-logr/zapr" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" ) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
