This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new b80f4aa feat: add base metrics endpoint (#37)
b80f4aa is described below
commit b80f4aa1cadd7736779084ee62887a6215d64d9d
Author: shown <[email protected]>
AuthorDate: Sat Dec 27 15:41:02 2025 +0800
feat: add base metrics endpoint (#37)
Co-authored-by: lynx009 <[email protected]>
---
etc/hertzbeat-collector.yaml | 3 +
internal/cmd/server.go | 10 ++-
internal/metrics/metrics.go | 134 ++++++++++++++++++++++++++++++++++
internal/transport/transport.go | 96 +++++++++++-------------
internal/types/config/config_types.go | 16 ++--
5 files changed, 199 insertions(+), 60 deletions(-)
diff --git a/etc/hertzbeat-collector.yaml b/etc/hertzbeat-collector.yaml
index 5e87865..c9893c1 100644
--- a/etc/hertzbeat-collector.yaml
+++ b/etc/hertzbeat-collector.yaml
@@ -33,3 +33,6 @@ collector:
# Collector identity and mode
identity: hertzbeat-collector-go
mode: public
+
+ metrics:
+ port: 9090
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 5980263..63b1882 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -31,6 +31,7 @@ import (
cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
jobserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/metrics"
clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
transportserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
collectortypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
@@ -109,13 +110,15 @@ func server(ctx context.Context, logOut io.Writer) error {
}
func startRunners(ctx context.Context, cfg *clrserver.Server) error {
+
// Create transport server first
- transportRunner := transportserver.NewFromConfig(cfg.Config)
+ transportRunner := transportserver.New(cfg)
// Create lazy message router that can get transport client when needed
messageRouter := collect.NewLazyMessageRouter(transportRunner,
cfg.Logger, cfg.Config.Collector.Identity)
// Create job server with message router
+ // todo optimize not depend server start!
jobRunner := jobserver.New(&jobserver.Config{
Server: *cfg,
MessageRouter: messageRouter,
@@ -124,12 +127,15 @@ func startRunners(ctx context.Context, cfg
*clrserver.Server) error {
// Connect transport to job scheduler
transportRunner.SetJobScheduler(jobRunner)
+ // Create metrics runner
+ metricsRunner := metrics.New(cfg)
+
runners := []struct {
runner Runner[collectortypes.Info]
}{
+ {metricsRunner},
{jobRunner},
{transportRunner},
- // todo; add metrics
}
errCh := make(chan error, len(runners))
diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go
index 5e29267..70aa6bf 100644
--- a/internal/metrics/metrics.go
+++ b/internal/metrics/metrics.go
@@ -18,3 +18,137 @@
*/
package metrics
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/http"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+const (
+ Namespace = "hertzbeat"
+ Subsystem = "collector"
+)
+
+var (
+ // JobExecutionTotal counts the total number of job executions
+ JobExecutionTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "job_execution_total",
+ Help: "Total number of job executions",
+ },
+ []string{"status", "type"},
+ )
+
+ // JobExecutionDuration tracks the duration of job executions
+ JobExecutionDuration = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "job_execution_duration_seconds",
+ Help: "Duration of job executions in seconds",
+ Buckets: prometheus.DefBuckets,
+ },
+ []string{"type"},
+ )
+
+ // CollectorUp indicates if the collector is up
+ CollectorUp = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: Namespace,
+ Subsystem: Subsystem,
+ Name: "up",
+ Help: "1 if the collector is up, 0 otherwise",
+ },
+ )
+)
+
+func init() {
+ // Register metrics with the global prometheus registry
+ prometheus.MustRegister(JobExecutionTotal)
+ prometheus.MustRegister(JobExecutionDuration)
+ prometheus.MustRegister(CollectorUp)
+ CollectorUp.Set(1)
+}
+
+// Runner implements the metrics server runner
+type Runner struct {
+ cfg *clrserver.Server
+ server *http.Server
+}
+
+// New creates a new metrics runner
+func New(cfg *clrserver.Server) *Runner {
+
+ return &Runner{cfg: cfg}
+}
+
+// Start starts the metrics server
+func (r *Runner) Start(ctx context.Context) error {
+
+ // init logger
+ mlog := r.initLogs()
+
+ mux := http.NewServeMux()
+ mux.Handle("/metrics", promhttp.Handler())
+
+ addr := fmt.Sprintf(":%d", r.cfg.Config.Collector.MetricsConfig.Port)
+ r.server = &http.Server{
+ Addr: addr,
+ Handler: mux,
+ ReadTimeout: 5 * time.Second,
+ ReadHeaderTimeout: 5 * time.Second,
+ WriteTimeout: 10 * time.Second,
+ IdleTimeout: 15 * time.Second,
+ MaxHeaderBytes: 1 << 20, // 1 MB
+ BaseContext: func(_ net.Listener) context.Context {
+ return ctx
+ },
+ }
+
+ mlog.Info("Starting metrics server", "addr", addr)
+
+ go func() {
+ if err := r.server.ListenAndServe(); err != nil && err !=
http.ErrServerClosed {
+ mlog.Error(err, "Metrics server failed")
+ }
+ }()
+
+ <-ctx.Done()
+ return r.Close()
+}
+
+// Info returns the runner info
+func (r *Runner) Info() collector.Info {
+
+ return collector.Info{
+ Name: "metrics-server",
+ }
+}
+
+// Close closes the metrics server
+func (r *Runner) Close() error {
+
+ if r.server != nil {
+ r.initLogs().Info("Shutting down metrics server")
+ ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
+ defer cancel()
+ return r.server.Shutdown(ctx)
+ }
+ return nil
+}
+
+func (r *Runner) initLogs() logger.Logger {
+
+ return r.cfg.Logger.WithName("metrics")
+}
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
index 8d0c155..ffdebf2 100644
--- a/internal/transport/transport.go
+++ b/internal/transport/transport.go
@@ -27,8 +27,6 @@ import (
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
- configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -44,18 +42,18 @@ const (
)
type Runner struct {
- Config *configtypes.CollectorConfig
client TransportClient
jobScheduler JobScheduler
- clrserver.Server
+ cfg clrserver.Server
+ tlog logger.Logger
}
-func New(cfg *configtypes.CollectorConfig) *Runner {
+func New(cfg *clrserver.Server) *Runner {
+
return &Runner{
- Config: cfg,
- Server: clrserver.Server{
- Logger: logger.Logger{}, // Will be initialized
properly in Start method
- },
+ cfg: *cfg,
+ // init logger
+ tlog: cfg.Logger.WithName("transport"),
}
}
@@ -64,24 +62,12 @@ func (r *Runner) SetJobScheduler(scheduler JobScheduler) {
r.jobScheduler = scheduler
}
-// NewFromConfig creates a new transport runner from collector configuration
-func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
- if cfg == nil {
- return nil
- }
- return New(cfg)
-}
-
func (r *Runner) Start(ctx context.Context) error {
- // 初始化 Logger 如果它还没有被设置
- if r.Logger.IsZero() {
- r.Logger = logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo)
- }
- r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner",
r.Info().Name)
- r.Logger.Info("Starting transport client")
+
+ r.tlog.Info("Starting transport client")
// 构建 server 地址
- addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host,
r.Config.Collector.Manager.Port)
+ addr := fmt.Sprintf("%s:%s", r.cfg.Config.Collector.Manager.Host,
r.cfg.Config.Collector.Manager.Port)
if addr == ":" {
// 如果配置为空,使用环境变量或默认值
if v := os.Getenv("MANAGER_ADDR"); v != "" {
@@ -92,7 +78,7 @@ func (r *Runner) Start(ctx context.Context) error {
}
// 确定协议
- protocol := r.Config.Collector.Manager.Protocol
+ protocol := r.cfg.Config.Collector.Manager.Protocol
if protocol == "" {
if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
protocol = v
@@ -101,17 +87,17 @@ func (r *Runner) Start(ctx context.Context) error {
}
}
- r.Logger.Info("Connecting to manager server", "addr", addr, "protocol",
protocol)
+ r.tlog.Info("Connecting to manager server", "addr", addr, "protocol",
protocol)
// 创建客户端
factory := &TransportClientFactory{}
client, err := factory.CreateClient(protocol, addr)
if err != nil {
- r.Logger.Error(err, "Failed to create transport client, will
retry in background")
+ r.tlog.Error(err, "Failed to create transport client, will
retry in background")
// 不直接返回错误,而是继续启动,后续会在后台重试连接
} else {
// Set the identity on the client if it supports it
- identity := r.Config.Collector.Identity
+ identity := r.cfg.Config.Collector.Identity
if identity == "" {
identity = DefaultIdentity
}
@@ -128,77 +114,77 @@ func (r *Runner) Start(ctx context.Context) error {
c.SetEventHandler(func(event Event) {
switch event.Type {
case EventConnected:
- r.Logger.Info("Connected to manager
gRPC server", "addr", event.Address)
+ r.tlog.Info("Connected to manager gRPC
server", "addr", event.Address)
go r.sendOnlineMessage()
case EventDisconnected:
- r.Logger.Info("Disconnected from
manager gRPC server", "addr", event.Address)
+ r.tlog.Info("Disconnected from manager
gRPC server", "addr", event.Address)
case EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to
connect to manager gRPC server, will retry", "addr", event.Address)
+ r.tlog.Error(event.Error, "Failed to
connect to manager gRPC server, will retry", "addr", event.Address)
}
})
// Register processors with job scheduler
if r.jobScheduler != nil {
RegisterDefaultProcessors(c, r.jobScheduler)
- r.Logger.Info("Registered gRPC processors with
job scheduler")
+ r.tlog.Info("Registered gRPC processors with
job scheduler")
} else {
RegisterDefaultProcessors(c, nil)
- r.Logger.Info("Registered gRPC processors
without job scheduler")
+ r.tlog.Info("Registered gRPC processors without
job scheduler")
}
case *NettyClient:
c.SetEventHandler(func(event Event) {
switch event.Type {
case EventConnected:
- r.Logger.Info("Connected to manager
netty server", "addr", event.Address)
+ r.tlog.Info("Connected to manager netty
server", "addr", event.Address)
go r.sendOnlineMessage()
case EventDisconnected:
- r.Logger.Info("Disconnected from
manager netty server", "addr", event.Address)
+ r.tlog.Info("Disconnected from manager
netty server", "addr", event.Address)
case EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to
connect to manager netty server, will retry", "addr", event.Address)
+ r.tlog.Error(event.Error, "Failed to
connect to manager netty server, will retry", "addr", event.Address)
}
})
// Register processors with job scheduler
if r.jobScheduler != nil {
RegisterDefaultNettyProcessors(c,
r.jobScheduler)
- r.Logger.Info("Registered netty processors with
job scheduler")
+ r.tlog.Info("Registered netty processors with
job scheduler")
} else {
RegisterDefaultNettyProcessors(c, nil)
- r.Logger.Info("Registered netty processors
without job scheduler")
+ r.tlog.Info("Registered netty processors
without job scheduler")
}
}
// 尝试启动客户端,如果失败则启动重连循环
if err := r.client.Start(); err != nil {
- r.Logger.Error(err, "Failed to start transport client
on first attempt, starting retry loop")
+ r.tlog.Error(err, "Failed to start transport client on
first attempt, starting retry loop")
// 在后台启动重连循环
go func() {
attempt := 0
for {
attempt++
- r.Logger.Info("Reconnection attempt",
"attempt", attempt, "wait", "3s")
+ r.tlog.Info("Reconnection attempt",
"attempt", attempt, "wait", "3s")
time.Sleep(3 * time.Second)
// 检查是否应该停止
select {
case <-ctx.Done():
- r.Logger.Info("Reconnection
loop stopped due to context cancellation")
+ r.tlog.Info("Reconnection loop
stopped due to context cancellation")
return
default:
}
- r.Logger.Info("Trying to connect to
manager", "attempt", attempt, "addr", addr)
+ r.tlog.Info("Trying to connect to
manager", "attempt", attempt, "addr", addr)
if err := r.client.Start(); err == nil {
- r.Logger.Info("Successfully
connected to manager", "attempt", attempt)
+ r.tlog.Info("Successfully
connected to manager", "attempt", attempt)
return
} else {
- r.Logger.Error(err, "Connection
failed, will retry", "attempt", attempt)
+ r.tlog.Error(err, "Connection
failed, will retry", "attempt", attempt)
}
}
}()
}
}
- r.Logger.Info("Transport runner started successfully, connection will
be established in background")
+ r.tlog.Info("Transport runner started successfully, connection will be
established in background")
// 创建新的context用于监控关闭信号
ctx, cancel := context.WithCancel(ctx)
@@ -207,7 +193,7 @@ func (r *Runner) Start(ctx context.Context) error {
// 监听 ctx.Done 优雅关闭
go func() {
<-ctx.Done()
- r.Logger.Info("Shutting down transport client...")
+ r.tlog.Info("Shutting down transport client...")
if r.client != nil {
_ = r.client.Shutdown()
}
@@ -221,13 +207,13 @@ func (r *Runner) Start(ctx context.Context) error {
func (r *Runner) sendOnlineMessage() {
if r.client != nil && r.client.IsStarted() {
// Use the configured identity
- identity := r.Config.Collector.Identity
+ identity := r.cfg.Config.Collector.Identity
if identity == "" {
identity = DefaultIdentity
}
// Create CollectorInfo JSON structure as expected by Java
server
- mode := r.Config.Collector.Mode
+ mode := r.cfg.Config.Collector.Mode
if mode == "" {
mode = DefaultMode // Default mode as in Java version
}
@@ -242,7 +228,7 @@ func (r *Runner) sendOnlineMessage() {
// Convert to JSON bytes
jsonData, err := json.Marshal(collectorInfo)
if err != nil {
- r.Logger.Error(err, "Failed to marshal collector info
to JSON")
+ r.tlog.Error(err, "Failed to marshal collector info to
JSON")
return
}
@@ -253,12 +239,12 @@ func (r *Runner) sendOnlineMessage() {
Msg: jsonData,
}
- r.Logger.Info("Sending online message", "identity", identity,
"type", onlineMsg.Type)
+ r.tlog.Info("Sending online message", "identity", identity,
"type", onlineMsg.Type)
if err := r.client.SendMsg(onlineMsg); err != nil {
- r.Logger.Error(err, "Failed to send online message",
"identity", identity)
+ r.tlog.Error(err, "Failed to send online message",
"identity", identity)
} else {
- r.Logger.Info("Online message sent successfully",
"identity", identity)
+ r.tlog.Info("Online message sent successfully",
"identity", identity)
}
}
}
@@ -270,19 +256,23 @@ func (r *Runner) Info() collector.Info {
}
func (r *Runner) Close() error {
- r.Logger.Info("transport close...")
+
+ r.tlog.Info("transport close...")
if r.client != nil {
_ = r.client.Shutdown()
}
+
return nil
}
// GetClient returns the transport client (for testing and advanced usage)
func (r *Runner) GetClient() TransportClient {
+
return r.client
}
// IsConnected returns whether the client is connected and started
func (r *Runner) IsConnected() bool {
+
return r.client != nil && r.client.IsStarted()
}
diff --git a/internal/types/config/config_types.go
b/internal/types/config/config_types.go
index 4398cce..3dd544c 100644
--- a/internal/types/config/config_types.go
+++ b/internal/types/config/config_types.go
@@ -22,11 +22,12 @@ type CollectorConfig struct {
}
type CollectorSection struct {
- Info CollectorInfo `yaml:"info"`
- Log CollectorLogConfig `yaml:"log"`
- Manager ManagerConfig `yaml:"manager"`
- Identity string `yaml:"identity"`
- Mode string `yaml:"mode"`
+ Info CollectorInfo `yaml:"info"`
+ Log CollectorLogConfig `yaml:"log"`
+ Manager ManagerConfig `yaml:"manager"`
+ Identity string `yaml:"identity"`
+ Mode string `yaml:"mode"`
+ MetricsConfig MetricsConfig `yaml:"metrics"`
// todo dispatcher
}
@@ -46,3 +47,8 @@ type ManagerConfig struct {
Port string `yaml:"port"`
Protocol string `yaml:"protocol"`
}
+
+type MetricsConfig struct {
+ // Others?
+ Port int `yaml:"port"`
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]