This is an automated email from the ASF dual-hosted git repository.

shown pushed a commit to branch 1217-yuluo/add-o11y
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit 4a6917188e31d153bddda6c03f2f2dd4057ef704
Author: yuluo-yx <[email protected]>
AuthorDate: Wed Dec 17 23:46:43 2025 +0800

    feat: add base metrics endpoint
    
    Signed-off-by: yuluo-yx <[email protected]>
---
 etc/hertzbeat-collector.yaml          |   3 +
 internal/cmd/server.go                |  10 ++-
 internal/metrics/metrics.go           | 134 ++++++++++++++++++++++++++++++++++
 internal/transport/transport.go       |  96 +++++++++++-------------
 internal/types/config/config_types.go |  16 ++--
 5 files changed, 199 insertions(+), 60 deletions(-)

diff --git a/etc/hertzbeat-collector.yaml b/etc/hertzbeat-collector.yaml
index 5e87865..c9893c1 100644
--- a/etc/hertzbeat-collector.yaml
+++ b/etc/hertzbeat-collector.yaml
@@ -33,3 +33,6 @@ collector:
   # Collector identity and mode
   identity: hertzbeat-collector-go
   mode: public
+
+  metrics:
+    port: 9090
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 5980263..63b1882 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -31,6 +31,7 @@ import (
        cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
        jobserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/metrics"
        clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
        transportserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
        collectortypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
@@ -109,13 +110,15 @@ func server(ctx context.Context, logOut io.Writer) error {
 }
 
 func startRunners(ctx context.Context, cfg *clrserver.Server) error {
+
        // Create transport server first
-       transportRunner := transportserver.NewFromConfig(cfg.Config)
+       transportRunner := transportserver.New(cfg)
 
        // Create lazy message router that can get transport client when needed
        messageRouter := collect.NewLazyMessageRouter(transportRunner, 
cfg.Logger, cfg.Config.Collector.Identity)
 
        // Create job server with message router
+       // todo optimize not depend server start!
        jobRunner := jobserver.New(&jobserver.Config{
                Server:        *cfg,
                MessageRouter: messageRouter,
@@ -124,12 +127,15 @@ func startRunners(ctx context.Context, cfg 
*clrserver.Server) error {
        // Connect transport to job scheduler
        transportRunner.SetJobScheduler(jobRunner)
 
+       // Create metrics runner
+       metricsRunner := metrics.New(cfg)
+
        runners := []struct {
                runner Runner[collectortypes.Info]
        }{
+               {metricsRunner},
                {jobRunner},
                {transportRunner},
-               // todo; add metrics
        }
 
        errCh := make(chan error, len(runners))
diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go
index 5e29267..70aa6bf 100644
--- a/internal/metrics/metrics.go
+++ b/internal/metrics/metrics.go
@@ -18,3 +18,137 @@
  */
 
 package metrics
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "net/http"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+const (
+       Namespace = "hertzbeat"
+       Subsystem = "collector"
+)
+
+var (
+       // JobExecutionTotal counts the total number of job executions
+       JobExecutionTotal = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: Namespace,
+                       Subsystem: Subsystem,
+                       Name:      "job_execution_total",
+                       Help:      "Total number of job executions",
+               },
+               []string{"status", "type"},
+       )
+
+       // JobExecutionDuration tracks the duration of job executions
+       JobExecutionDuration = prometheus.NewHistogramVec(
+               prometheus.HistogramOpts{
+                       Namespace: Namespace,
+                       Subsystem: Subsystem,
+                       Name:      "job_execution_duration_seconds",
+                       Help:      "Duration of job executions in seconds",
+                       Buckets:   prometheus.DefBuckets,
+               },
+               []string{"type"},
+       )
+
+       // CollectorUp indicates if the collector is up
+       CollectorUp = prometheus.NewGauge(
+               prometheus.GaugeOpts{
+                       Namespace: Namespace,
+                       Subsystem: Subsystem,
+                       Name:      "up",
+                       Help:      "1 if the collector is up, 0 otherwise",
+               },
+       )
+)
+
+func init() {
+       // Register metrics with the global prometheus registry
+       prometheus.MustRegister(JobExecutionTotal)
+       prometheus.MustRegister(JobExecutionDuration)
+       prometheus.MustRegister(CollectorUp)
+       CollectorUp.Set(1)
+}
+
+// Runner implements the metrics server runner
+type Runner struct {
+       cfg    *clrserver.Server
+       server *http.Server
+}
+
+// New creates a new metrics runner
+func New(cfg *clrserver.Server) *Runner {
+
+       return &Runner{cfg: cfg}
+}
+
+// Start starts the metrics server
+func (r *Runner) Start(ctx context.Context) error {
+
+       // init logger
+       mlog := r.initLogs()
+
+       mux := http.NewServeMux()
+       mux.Handle("/metrics", promhttp.Handler())
+
+       addr := fmt.Sprintf(":%d", r.cfg.Config.Collector.MetricsConfig.Port)
+       r.server = &http.Server{
+               Addr:              addr,
+               Handler:           mux,
+               ReadTimeout:       5 * time.Second,
+               ReadHeaderTimeout: 5 * time.Second,
+               WriteTimeout:      10 * time.Second,
+               IdleTimeout:       15 * time.Second,
+               MaxHeaderBytes:    1 << 20, // 1 MB
+               BaseContext: func(_ net.Listener) context.Context {
+                       return ctx
+               },
+       }
+
+       mlog.Info("Starting metrics server", "addr", addr)
+
+       go func() {
+               if err := r.server.ListenAndServe(); err != nil && err != 
http.ErrServerClosed {
+                       mlog.Error(err, "Metrics server failed")
+               }
+       }()
+
+       <-ctx.Done()
+       return r.Close()
+}
+
+// Info returns the runner info
+func (r *Runner) Info() collector.Info {
+
+       return collector.Info{
+               Name: "metrics-server",
+       }
+}
+
+// Close closes the metrics server
+func (r *Runner) Close() error {
+
+       if r.server != nil {
+               r.initLogs().Info("Shutting down metrics server")
+               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
+               defer cancel()
+               return r.server.Shutdown(ctx)
+       }
+       return nil
+}
+
+func (r *Runner) initLogs() logger.Logger {
+
+       return r.cfg.Logger.WithName("metrics")
+}
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
index 8d0c155..ffdebf2 100644
--- a/internal/transport/transport.go
+++ b/internal/transport/transport.go
@@ -27,8 +27,6 @@ import (
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
        clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
-       configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
@@ -44,18 +42,18 @@ const (
 )
 
 type Runner struct {
-       Config       *configtypes.CollectorConfig
        client       TransportClient
        jobScheduler JobScheduler
-       clrserver.Server
+       cfg          clrserver.Server
+       tlog         logger.Logger
 }
 
-func New(cfg *configtypes.CollectorConfig) *Runner {
+func New(cfg *clrserver.Server) *Runner {
+
        return &Runner{
-               Config: cfg,
-               Server: clrserver.Server{
-                       Logger: logger.Logger{}, // Will be initialized 
properly in Start method
-               },
+               cfg: *cfg,
+               // init logger
+               tlog: cfg.Logger.WithName("transport"),
        }
 }
 
@@ -64,24 +62,12 @@ func (r *Runner) SetJobScheduler(scheduler JobScheduler) {
        r.jobScheduler = scheduler
 }
 
-// NewFromConfig creates a new transport runner from collector configuration
-func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
-       if cfg == nil {
-               return nil
-       }
-       return New(cfg)
-}
-
 func (r *Runner) Start(ctx context.Context) error {
-       // 初始化 Logger 如果它还没有被设置
-       if r.Logger.IsZero() {
-               r.Logger = logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo)
-       }
-       r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
-       r.Logger.Info("Starting transport client")
+
+       r.tlog.Info("Starting transport client")
 
        // 构建 server 地址
-       addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, 
r.Config.Collector.Manager.Port)
+       addr := fmt.Sprintf("%s:%s", r.cfg.Config.Collector.Manager.Host, 
r.cfg.Config.Collector.Manager.Port)
        if addr == ":" {
                // 如果配置为空,使用环境变量或默认值
                if v := os.Getenv("MANAGER_ADDR"); v != "" {
@@ -92,7 +78,7 @@ func (r *Runner) Start(ctx context.Context) error {
        }
 
        // 确定协议
-       protocol := r.Config.Collector.Manager.Protocol
+       protocol := r.cfg.Config.Collector.Manager.Protocol
        if protocol == "" {
                if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
                        protocol = v
@@ -101,17 +87,17 @@ func (r *Runner) Start(ctx context.Context) error {
                }
        }
 
-       r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", 
protocol)
+       r.tlog.Info("Connecting to manager server", "addr", addr, "protocol", 
protocol)
 
        // 创建客户端
        factory := &TransportClientFactory{}
        client, err := factory.CreateClient(protocol, addr)
        if err != nil {
-               r.Logger.Error(err, "Failed to create transport client, will 
retry in background")
+               r.tlog.Error(err, "Failed to create transport client, will 
retry in background")
                // 不直接返回错误,而是继续启动,后续会在后台重试连接
        } else {
                // Set the identity on the client if it supports it
-               identity := r.Config.Collector.Identity
+               identity := r.cfg.Config.Collector.Identity
                if identity == "" {
                        identity = DefaultIdentity
                }
@@ -128,77 +114,77 @@ func (r *Runner) Start(ctx context.Context) error {
                        c.SetEventHandler(func(event Event) {
                                switch event.Type {
                                case EventConnected:
-                                       r.Logger.Info("Connected to manager 
gRPC server", "addr", event.Address)
+                                       r.tlog.Info("Connected to manager gRPC 
server", "addr", event.Address)
                                        go r.sendOnlineMessage()
                                case EventDisconnected:
-                                       r.Logger.Info("Disconnected from 
manager gRPC server", "addr", event.Address)
+                                       r.tlog.Info("Disconnected from manager 
gRPC server", "addr", event.Address)
                                case EventConnectFailed:
-                                       r.Logger.Error(event.Error, "Failed to 
connect to manager gRPC server, will retry", "addr", event.Address)
+                                       r.tlog.Error(event.Error, "Failed to 
connect to manager gRPC server, will retry", "addr", event.Address)
                                }
                        })
                        // Register processors with job scheduler
                        if r.jobScheduler != nil {
                                RegisterDefaultProcessors(c, r.jobScheduler)
-                               r.Logger.Info("Registered gRPC processors with 
job scheduler")
+                               r.tlog.Info("Registered gRPC processors with 
job scheduler")
                        } else {
                                RegisterDefaultProcessors(c, nil)
-                               r.Logger.Info("Registered gRPC processors 
without job scheduler")
+                               r.tlog.Info("Registered gRPC processors without 
job scheduler")
                        }
                case *NettyClient:
                        c.SetEventHandler(func(event Event) {
                                switch event.Type {
                                case EventConnected:
-                                       r.Logger.Info("Connected to manager 
netty server", "addr", event.Address)
+                                       r.tlog.Info("Connected to manager netty 
server", "addr", event.Address)
                                        go r.sendOnlineMessage()
                                case EventDisconnected:
-                                       r.Logger.Info("Disconnected from 
manager netty server", "addr", event.Address)
+                                       r.tlog.Info("Disconnected from manager 
netty server", "addr", event.Address)
                                case EventConnectFailed:
-                                       r.Logger.Error(event.Error, "Failed to 
connect to manager netty server, will retry", "addr", event.Address)
+                                       r.tlog.Error(event.Error, "Failed to 
connect to manager netty server, will retry", "addr", event.Address)
                                }
                        })
                        // Register processors with job scheduler
                        if r.jobScheduler != nil {
                                RegisterDefaultNettyProcessors(c, 
r.jobScheduler)
-                               r.Logger.Info("Registered netty processors with 
job scheduler")
+                               r.tlog.Info("Registered netty processors with 
job scheduler")
                        } else {
                                RegisterDefaultNettyProcessors(c, nil)
-                               r.Logger.Info("Registered netty processors 
without job scheduler")
+                               r.tlog.Info("Registered netty processors 
without job scheduler")
                        }
                }
 
                // 尝试启动客户端,如果失败则启动重连循环
                if err := r.client.Start(); err != nil {
-                       r.Logger.Error(err, "Failed to start transport client 
on first attempt, starting retry loop")
+                       r.tlog.Error(err, "Failed to start transport client on 
first attempt, starting retry loop")
 
                        // 在后台启动重连循环
                        go func() {
                                attempt := 0
                                for {
                                        attempt++
-                                       r.Logger.Info("Reconnection attempt", 
"attempt", attempt, "wait", "3s")
+                                       r.tlog.Info("Reconnection attempt", 
"attempt", attempt, "wait", "3s")
                                        time.Sleep(3 * time.Second)
 
                                        // 检查是否应该停止
                                        select {
                                        case <-ctx.Done():
-                                               r.Logger.Info("Reconnection 
loop stopped due to context cancellation")
+                                               r.tlog.Info("Reconnection loop 
stopped due to context cancellation")
                                                return
                                        default:
                                        }
 
-                                       r.Logger.Info("Trying to connect to 
manager", "attempt", attempt, "addr", addr)
+                                       r.tlog.Info("Trying to connect to 
manager", "attempt", attempt, "addr", addr)
                                        if err := r.client.Start(); err == nil {
-                                               r.Logger.Info("Successfully 
connected to manager", "attempt", attempt)
+                                               r.tlog.Info("Successfully 
connected to manager", "attempt", attempt)
                                                return
                                        } else {
-                                               r.Logger.Error(err, "Connection 
failed, will retry", "attempt", attempt)
+                                               r.tlog.Error(err, "Connection 
failed, will retry", "attempt", attempt)
                                        }
                                }
                        }()
                }
        }
 
-       r.Logger.Info("Transport runner started successfully, connection will 
be established in background")
+       r.tlog.Info("Transport runner started successfully, connection will be 
established in background")
 
        // 创建新的context用于监控关闭信号
        ctx, cancel := context.WithCancel(ctx)
@@ -207,7 +193,7 @@ func (r *Runner) Start(ctx context.Context) error {
        // 监听 ctx.Done 优雅关闭
        go func() {
                <-ctx.Done()
-               r.Logger.Info("Shutting down transport client...")
+               r.tlog.Info("Shutting down transport client...")
                if r.client != nil {
                        _ = r.client.Shutdown()
                }
@@ -221,13 +207,13 @@ func (r *Runner) Start(ctx context.Context) error {
 func (r *Runner) sendOnlineMessage() {
        if r.client != nil && r.client.IsStarted() {
                // Use the configured identity
-               identity := r.Config.Collector.Identity
+               identity := r.cfg.Config.Collector.Identity
                if identity == "" {
                        identity = DefaultIdentity
                }
 
                // Create CollectorInfo JSON structure as expected by Java 
server
-               mode := r.Config.Collector.Mode
+               mode := r.cfg.Config.Collector.Mode
                if mode == "" {
                        mode = DefaultMode // Default mode as in Java version
                }
@@ -242,7 +228,7 @@ func (r *Runner) sendOnlineMessage() {
                // Convert to JSON bytes
                jsonData, err := json.Marshal(collectorInfo)
                if err != nil {
-                       r.Logger.Error(err, "Failed to marshal collector info 
to JSON")
+                       r.tlog.Error(err, "Failed to marshal collector info to 
JSON")
                        return
                }
 
@@ -253,12 +239,12 @@ func (r *Runner) sendOnlineMessage() {
                        Msg:       jsonData,
                }
 
-               r.Logger.Info("Sending online message", "identity", identity, 
"type", onlineMsg.Type)
+               r.tlog.Info("Sending online message", "identity", identity, 
"type", onlineMsg.Type)
 
                if err := r.client.SendMsg(onlineMsg); err != nil {
-                       r.Logger.Error(err, "Failed to send online message", 
"identity", identity)
+                       r.tlog.Error(err, "Failed to send online message", 
"identity", identity)
                } else {
-                       r.Logger.Info("Online message sent successfully", 
"identity", identity)
+                       r.tlog.Info("Online message sent successfully", 
"identity", identity)
                }
        }
 }
@@ -270,19 +256,23 @@ func (r *Runner) Info() collector.Info {
 }
 
 func (r *Runner) Close() error {
-       r.Logger.Info("transport close...")
+
+       r.tlog.Info("transport close...")
        if r.client != nil {
                _ = r.client.Shutdown()
        }
+
        return nil
 }
 
 // GetClient returns the transport client (for testing and advanced usage)
 func (r *Runner) GetClient() TransportClient {
+
        return r.client
 }
 
 // IsConnected returns whether the client is connected and started
 func (r *Runner) IsConnected() bool {
+
        return r.client != nil && r.client.IsStarted()
 }
diff --git a/internal/types/config/config_types.go 
b/internal/types/config/config_types.go
index 4398cce..3dd544c 100644
--- a/internal/types/config/config_types.go
+++ b/internal/types/config/config_types.go
@@ -22,11 +22,12 @@ type CollectorConfig struct {
 }
 
 type CollectorSection struct {
-       Info     CollectorInfo      `yaml:"info"`
-       Log      CollectorLogConfig `yaml:"log"`
-       Manager  ManagerConfig      `yaml:"manager"`
-       Identity string             `yaml:"identity"`
-       Mode     string             `yaml:"mode"`
+       Info          CollectorInfo      `yaml:"info"`
+       Log           CollectorLogConfig `yaml:"log"`
+       Manager       ManagerConfig      `yaml:"manager"`
+       Identity      string             `yaml:"identity"`
+       Mode          string             `yaml:"mode"`
+       MetricsConfig MetricsConfig      `yaml:"metrics"`
        // todo dispatcher
 }
 
@@ -46,3 +47,8 @@ type ManagerConfig struct {
        Port     string `yaml:"port"`
        Protocol string `yaml:"protocol"`
 }
+
+type MetricsConfig struct {
+       // Others?
+       Port int `yaml:"port"`
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to