This is an automated email from the ASF dual-hosted git repository. shown pushed a commit to branch 1217-yuluo/add-o11y in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 4a6917188e31d153bddda6c03f2f2dd4057ef704 Author: yuluo-yx <[email protected]> AuthorDate: Wed Dec 17 23:46:43 2025 +0800 feat: add base metrics endpoint Signed-off-by: yuluo-yx <[email protected]> --- etc/hertzbeat-collector.yaml | 3 + internal/cmd/server.go | 10 ++- internal/metrics/metrics.go | 134 ++++++++++++++++++++++++++++++++++ internal/transport/transport.go | 96 +++++++++++------------- internal/types/config/config_types.go | 16 ++-- 5 files changed, 199 insertions(+), 60 deletions(-) diff --git a/etc/hertzbeat-collector.yaml b/etc/hertzbeat-collector.yaml index 5e87865..c9893c1 100644 --- a/etc/hertzbeat-collector.yaml +++ b/etc/hertzbeat-collector.yaml @@ -33,3 +33,6 @@ collector: # Collector identity and mode identity: hertzbeat-collector-go mode: public + + metrics: + port: 9090 diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 5980263..63b1882 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -31,6 +31,7 @@ import ( cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/config" "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect" jobserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/metrics" clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" transportserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" collectortypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector" @@ -109,13 +110,15 @@ func server(ctx context.Context, logOut io.Writer) error { } func startRunners(ctx context.Context, cfg *clrserver.Server) error { + // Create transport server first - transportRunner := transportserver.NewFromConfig(cfg.Config) + transportRunner := transportserver.New(cfg) // Create lazy message router that can get transport client when needed messageRouter := collect.NewLazyMessageRouter(transportRunner, cfg.Logger, cfg.Config.Collector.Identity) // Create job server with message router + // todo optimize not depend server start! jobRunner := jobserver.New(&jobserver.Config{ Server: *cfg, MessageRouter: messageRouter, @@ -124,12 +127,15 @@ func startRunners(ctx context.Context, cfg *clrserver.Server) error { // Connect transport to job scheduler transportRunner.SetJobScheduler(jobRunner) + // Create metrics runner + metricsRunner := metrics.New(cfg) + runners := []struct { runner Runner[collectortypes.Info] }{ + {metricsRunner}, {jobRunner}, {transportRunner}, - // todo; add metrics } errCh := make(chan error, len(runners)) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 5e29267..70aa6bf 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -18,3 +18,137 @@ */ package metrics + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +const ( + Namespace = "hertzbeat" + Subsystem = "collector" +) + +var ( + // JobExecutionTotal counts the total number of job executions + JobExecutionTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "job_execution_total", + Help: "Total number of job executions", + }, + []string{"status", "type"}, + ) + + // JobExecutionDuration tracks the duration of job executions + JobExecutionDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "job_execution_duration_seconds", + Help: "Duration of job executions in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"type"}, + ) + + // CollectorUp indicates if the collector is up + CollectorUp = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "up", + Help: "1 if the collector is up, 0 otherwise", + }, + ) +) + +func init() { + // Register metrics with the global prometheus registry + prometheus.MustRegister(JobExecutionTotal) + prometheus.MustRegister(JobExecutionDuration) + prometheus.MustRegister(CollectorUp) + CollectorUp.Set(1) +} + +// Runner implements the metrics server runner +type Runner struct { + cfg *clrserver.Server + server *http.Server +} + +// New creates a new metrics runner +func New(cfg *clrserver.Server) *Runner { + + return &Runner{cfg: cfg} +} + +// Start starts the metrics server +func (r *Runner) Start(ctx context.Context) error { + + // init logger + mlog := r.initLogs() + + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + addr := fmt.Sprintf(":%d", r.cfg.Config.Collector.MetricsConfig.Port) + r.server = &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: 5 * time.Second, + ReadHeaderTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 15 * time.Second, + MaxHeaderBytes: 1 << 20, // 1 MB + BaseContext: func(_ net.Listener) context.Context { + return ctx + }, + } + + mlog.Info("Starting metrics server", "addr", addr) + + go func() { + if err := r.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + mlog.Error(err, "Metrics server failed") + } + }() + + <-ctx.Done() + return r.Close() +} + +// Info returns the runner info +func (r *Runner) Info() collector.Info { + + return collector.Info{ + Name: "metrics-server", + } +} + +// Close closes the metrics server +func (r *Runner) Close() error { + + if r.server != nil { + r.initLogs().Info("Shutting down metrics server") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return r.server.Shutdown(ctx) + } + return nil +} + +func (r *Runner) initLogs() logger.Logger { + + return r.cfg.Logger.WithName("metrics") +} diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 8d0c155..ffdebf2 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -27,8 +27,6 @@ import ( pb "hertzbeat.apache.org/hertzbeat-collector-go/api" clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector" - configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -44,18 +42,18 @@ const ( ) type Runner struct { - Config *configtypes.CollectorConfig client TransportClient jobScheduler JobScheduler - clrserver.Server + cfg clrserver.Server + tlog logger.Logger } -func New(cfg *configtypes.CollectorConfig) *Runner { +func New(cfg *clrserver.Server) *Runner { + return &Runner{ - Config: cfg, - Server: clrserver.Server{ - Logger: logger.Logger{}, // Will be initialized properly in Start method - }, + cfg: *cfg, + // init logger + tlog: cfg.Logger.WithName("transport"), } } @@ -64,24 +62,12 @@ func (r *Runner) SetJobScheduler(scheduler JobScheduler) { r.jobScheduler = scheduler } -// NewFromConfig creates a new transport runner from collector configuration -func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner { - if cfg == nil { - return nil - } - return New(cfg) -} - func (r *Runner) Start(ctx context.Context) error { - // 初始化 Logger 如果它还没有被设置 - if r.Logger.IsZero() { - r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) - } - r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) - r.Logger.Info("Starting transport client") + + r.tlog.Info("Starting transport client") // 构建 server 地址 - addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, r.Config.Collector.Manager.Port) + addr := fmt.Sprintf("%s:%s", r.cfg.Config.Collector.Manager.Host, r.cfg.Config.Collector.Manager.Port) if addr == ":" { // 如果配置为空,使用环境变量或默认值 if v := os.Getenv("MANAGER_ADDR"); v != "" { @@ -92,7 +78,7 @@ func (r *Runner) Start(ctx context.Context) error { } // 确定协议 - protocol := r.Config.Collector.Manager.Protocol + protocol := r.cfg.Config.Collector.Manager.Protocol if protocol == "" { if v := os.Getenv("MANAGER_PROTOCOL"); v != "" { protocol = v @@ -101,17 +87,17 @@ func (r *Runner) Start(ctx context.Context) error { } } - r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", protocol) + r.tlog.Info("Connecting to manager server", "addr", addr, "protocol", protocol) // 创建客户端 factory := &TransportClientFactory{} client, err := factory.CreateClient(protocol, addr) if err != nil { - r.Logger.Error(err, "Failed to create transport client, will retry in background") + r.tlog.Error(err, "Failed to create transport client, will retry in background") // 不直接返回错误,而是继续启动,后续会在后台重试连接 } else { // Set the identity on the client if it supports it - identity := r.Config.Collector.Identity + identity := r.cfg.Config.Collector.Identity if identity == "" { identity = DefaultIdentity } @@ -128,77 +114,77 @@ func (r *Runner) Start(ctx context.Context) error { c.SetEventHandler(func(event Event) { switch event.Type { case EventConnected: - r.Logger.Info("Connected to manager gRPC server", "addr", event.Address) + r.tlog.Info("Connected to manager gRPC server", "addr", event.Address) go r.sendOnlineMessage() case EventDisconnected: - r.Logger.Info("Disconnected from manager gRPC server", "addr", event.Address) + r.tlog.Info("Disconnected from manager gRPC server", "addr", event.Address) case EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager gRPC server, will retry", "addr", event.Address) + r.tlog.Error(event.Error, "Failed to connect to manager gRPC server, will retry", "addr", event.Address) } }) // Register processors with job scheduler if r.jobScheduler != nil { RegisterDefaultProcessors(c, r.jobScheduler) - r.Logger.Info("Registered gRPC processors with job scheduler") + r.tlog.Info("Registered gRPC processors with job scheduler") } else { RegisterDefaultProcessors(c, nil) - r.Logger.Info("Registered gRPC processors without job scheduler") + r.tlog.Info("Registered gRPC processors without job scheduler") } case *NettyClient: c.SetEventHandler(func(event Event) { switch event.Type { case EventConnected: - r.Logger.Info("Connected to manager netty server", "addr", event.Address) + r.tlog.Info("Connected to manager netty server", "addr", event.Address) go r.sendOnlineMessage() case EventDisconnected: - r.Logger.Info("Disconnected from manager netty server", "addr", event.Address) + r.tlog.Info("Disconnected from manager netty server", "addr", event.Address) case EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager netty server, will retry", "addr", event.Address) + r.tlog.Error(event.Error, "Failed to connect to manager netty server, will retry", "addr", event.Address) } }) // Register processors with job scheduler if r.jobScheduler != nil { RegisterDefaultNettyProcessors(c, r.jobScheduler) - r.Logger.Info("Registered netty processors with job scheduler") + r.tlog.Info("Registered netty processors with job scheduler") } else { RegisterDefaultNettyProcessors(c, nil) - r.Logger.Info("Registered netty processors without job scheduler") + r.tlog.Info("Registered netty processors without job scheduler") } } // 尝试启动客户端,如果失败则启动重连循环 if err := r.client.Start(); err != nil { - r.Logger.Error(err, "Failed to start transport client on first attempt, starting retry loop") + r.tlog.Error(err, "Failed to start transport client on first attempt, starting retry loop") // 在后台启动重连循环 go func() { attempt := 0 for { attempt++ - r.Logger.Info("Reconnection attempt", "attempt", attempt, "wait", "3s") + r.tlog.Info("Reconnection attempt", "attempt", attempt, "wait", "3s") time.Sleep(3 * time.Second) // 检查是否应该停止 select { case <-ctx.Done(): - r.Logger.Info("Reconnection loop stopped due to context cancellation") + r.tlog.Info("Reconnection loop stopped due to context cancellation") return default: } - r.Logger.Info("Trying to connect to manager", "attempt", attempt, "addr", addr) + r.tlog.Info("Trying to connect to manager", "attempt", attempt, "addr", addr) if err := r.client.Start(); err == nil { - r.Logger.Info("Successfully connected to manager", "attempt", attempt) + r.tlog.Info("Successfully connected to manager", "attempt", attempt) return } else { - r.Logger.Error(err, "Connection failed, will retry", "attempt", attempt) + r.tlog.Error(err, "Connection failed, will retry", "attempt", attempt) } } }() } } - r.Logger.Info("Transport runner started successfully, connection will be established in background") + r.tlog.Info("Transport runner started successfully, connection will be established in background") // 创建新的context用于监控关闭信号 ctx, cancel := context.WithCancel(ctx) @@ -207,7 +193,7 @@ func (r *Runner) Start(ctx context.Context) error { // 监听 ctx.Done 优雅关闭 go func() { <-ctx.Done() - r.Logger.Info("Shutting down transport client...") + r.tlog.Info("Shutting down transport client...") if r.client != nil { _ = r.client.Shutdown() } @@ -221,13 +207,13 @@ func (r *Runner) Start(ctx context.Context) error { func (r *Runner) sendOnlineMessage() { if r.client != nil && r.client.IsStarted() { // Use the configured identity - identity := r.Config.Collector.Identity + identity := r.cfg.Config.Collector.Identity if identity == "" { identity = DefaultIdentity } // Create CollectorInfo JSON structure as expected by Java server - mode := r.Config.Collector.Mode + mode := r.cfg.Config.Collector.Mode if mode == "" { mode = DefaultMode // Default mode as in Java version } @@ -242,7 +228,7 @@ func (r *Runner) sendOnlineMessage() { // Convert to JSON bytes jsonData, err := json.Marshal(collectorInfo) if err != nil { - r.Logger.Error(err, "Failed to marshal collector info to JSON") + r.tlog.Error(err, "Failed to marshal collector info to JSON") return } @@ -253,12 +239,12 @@ func (r *Runner) sendOnlineMessage() { Msg: jsonData, } - r.Logger.Info("Sending online message", "identity", identity, "type", onlineMsg.Type) + r.tlog.Info("Sending online message", "identity", identity, "type", onlineMsg.Type) if err := r.client.SendMsg(onlineMsg); err != nil { - r.Logger.Error(err, "Failed to send online message", "identity", identity) + r.tlog.Error(err, "Failed to send online message", "identity", identity) } else { - r.Logger.Info("Online message sent successfully", "identity", identity) + r.tlog.Info("Online message sent successfully", "identity", identity) } } } @@ -270,19 +256,23 @@ func (r *Runner) Info() collector.Info { } func (r *Runner) Close() error { - r.Logger.Info("transport close...") + + r.tlog.Info("transport close...") if r.client != nil { _ = r.client.Shutdown() } + return nil } // GetClient returns the transport client (for testing and advanced usage) func (r *Runner) GetClient() TransportClient { + return r.client } // IsConnected returns whether the client is connected and started func (r *Runner) IsConnected() bool { + return r.client != nil && r.client.IsStarted() } diff --git a/internal/types/config/config_types.go b/internal/types/config/config_types.go index 4398cce..3dd544c 100644 --- a/internal/types/config/config_types.go +++ b/internal/types/config/config_types.go @@ -22,11 +22,12 @@ type CollectorConfig struct { } type CollectorSection struct { - Info CollectorInfo `yaml:"info"` - Log CollectorLogConfig `yaml:"log"` - Manager ManagerConfig `yaml:"manager"` - Identity string `yaml:"identity"` - Mode string `yaml:"mode"` + Info CollectorInfo `yaml:"info"` + Log CollectorLogConfig `yaml:"log"` + Manager ManagerConfig `yaml:"manager"` + Identity string `yaml:"identity"` + Mode string `yaml:"mode"` + MetricsConfig MetricsConfig `yaml:"metrics"` // todo dispatcher } @@ -46,3 +47,8 @@ type ManagerConfig struct { Port string `yaml:"port"` Protocol string `yaml:"protocol"` } + +type MetricsConfig struct { + // Others? + Port int `yaml:"port"` +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
