This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new ffee3d8 feat: add redis collector impl (#30)
ffee3d8 is described below
commit ffee3d83546d2bfb83d5af001083962370d92dbf
Author: shown <[email protected]>
AuthorDate: Thu Dec 4 07:28:46 2025 +0800
feat: add redis collector impl (#30)
---
go.mod | 6 +-
go.sum | 12 +
internal/collector/basic/init.go | 8 +-
internal/collector/basic/jmx/.keep | 0
internal/collector/basic/redis/.keep | 0
internal/collector/basic/redis/redis_collector.go | 345 +++++++++++++++++
.../collector/basic/redis/redis_collector_test.go | 200 ++++++++++
internal/collector/common/transport/transport.go | 420 ++++++++++-----------
.../collector/common/types/job/metrics_types.go | 12 +-
internal/util/param/param_replacer.go | 21 ++
10 files changed, 805 insertions(+), 219 deletions(-)
diff --git a/go.mod b/go.mod
index b924b9a..0891533 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module hertzbeat.apache.org/hertzbeat-collector-go
go 1.24.6
require (
+ github.com/alicebob/miniredis/v2 v2.35.0
github.com/apache/arrow/go/v13 v13.0.0
github.com/expr-lang/expr v1.16.9
github.com/go-logr/logr v1.4.3
@@ -11,6 +12,8 @@ require (
github.com/lib/pq v1.10.9
github.com/microsoft/go-mssqldb v1.9.3
github.com/prometheus/client_golang v1.23.0
+ github.com/prometheus/common v0.65.0
+ github.com/redis/go-redis/v9 v9.17.1
github.com/sijms/go-ora/v2 v2.9.0
github.com/spf13/cobra v1.10.1
github.com/stretchr/testify v1.10.0
@@ -26,6 +29,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc //
indirect
+ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f //
indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 //
indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
@@ -38,10 +42,10 @@ require (
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 //
indirect
github.com/prometheus/client_model v0.6.2 // indirect
- github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/spf13/pflag v1.0.9 // indirect
+ github.com/yuin/gopher-lua v1.1.1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.25.0 // indirect
diff --git a/go.sum b/go.sum
index b54c994..e28774c 100644
--- a/go.sum
+++ b/go.sum
@@ -12,15 +12,23 @@
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1 h1:bFWuo
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1/go.mod
h1:Vih/3yc6yac2JzU4hzpaDupBJP0Flaia9rXXrU8xyww=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2
h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs=
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod
h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
+github.com/alicebob/miniredis/v2 v2.35.0
h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI=
+github.com/alicebob/miniredis/v2 v2.35.0/go.mod
h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/apache/arrow/go/v13 v13.0.0
h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk=
github.com/apache/arrow/go/v13 v13.0.0/go.mod
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/bsm/ginkgo/v2 v2.12.0
h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
+github.com/bsm/ginkgo/v2 v2.12.0/go.mod
h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
+github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
+github.com/bsm/gomega v1.27.10/go.mod
h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod
h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/expr-lang/expr v1.16.9
h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=
github.com/expr-lang/expr v1.16.9/go.mod
h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
@@ -79,6 +87,8 @@ github.com/prometheus/common v0.65.0
h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2
github.com/prometheus/common v0.65.0/go.mod
h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
github.com/prometheus/procfs v0.16.1
h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod
h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
+github.com/redis/go-redis/v9 v9.17.1
h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
+github.com/redis/go-redis/v9 v9.17.1/go.mod
h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/rogpeppe/go-internal v1.13.1
h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod
h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/russross/blackfriday/v2 v2.1.0/go.mod
h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -90,6 +100,8 @@ github.com/spf13/pflag v1.0.9
h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
github.com/spf13/pflag v1.0.9/go.mod
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.10.0
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/yuin/gopher-lua v1.1.1
h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
+github.com/yuin/gopher-lua v1.1.1/go.mod
h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod
h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index de47b58..3145b48 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -22,6 +22,7 @@ package basic
import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
@@ -48,10 +49,11 @@ func init() {
return http.NewHTTPCollector(logger)
})
+ strategy.RegisterFactory("redis", func(logger logger.Logger)
strategy.Collector {
+ return redis.NewRedisCollector(logger)
+ })
+
// More protocols can be added here in the future:
- // strategy.RegisterFactory("redis", func(logger logger.Logger)
strategy.Collector {
- // return redis.NewRedisCollector(logger)
- // })
}
// InitializeAllCollectors initializes all registered collectors
diff --git a/internal/collector/basic/jmx/.keep
b/internal/collector/basic/jmx/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/basic/redis/.keep
b/internal/collector/basic/redis/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/basic/redis/redis_collector.go
b/internal/collector/basic/redis/redis_collector.go
new file mode 100644
index 0000000..6341815
--- /dev/null
+++ b/internal/collector/basic/redis/redis_collector.go
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package redis
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/redis/go-redis/v9"
+ "golang.org/x/crypto/ssh"
+
+ sshhelper
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+const (
+ ProtocolRedis = "redis"
+ ResponseTime = "responseTime"
+ NullValue = consts.NULL_VALUE
+ ClusterPattern = "3"
+ ClusterInfo = "cluster"
+ Identity = "identity"
+)
+
+// RedisCollector implements Redis metrics collection
+type RedisCollector struct {
+ logger logger.Logger
+}
+
+// NewRedisCollector creates a new Redis collector
+func NewRedisCollector(logger logger.Logger) *RedisCollector {
+ return &RedisCollector{
+ logger: logger.WithName("redis-collector"),
+ }
+}
+
+// Protocol returns the protocol this collector supports
+func (rc *RedisCollector) Protocol() string {
+ return ProtocolRedis
+}
+
+// PreCheck validates the Redis metrics configuration
+func (rc *RedisCollector) PreCheck(metrics *jobtypes.Metrics) error {
+ if metrics == nil || metrics.Redis == nil {
+ return fmt.Errorf("redis configuration is required")
+ }
+ if metrics.Redis.Host == "" {
+ return fmt.Errorf("redis host is required")
+ }
+ if metrics.Redis.Port == "" {
+ return fmt.Errorf("redis port is required")
+ }
+ return nil
+}
+
+// Collect performs Redis metrics collection
+func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics)
*jobtypes.CollectRepMetricsData {
+ startTime := time.Now()
+
+ // Create response structure
+ response := &jobtypes.CollectRepMetricsData{
+ Metrics: metrics.Name,
+ Time: startTime.UnixMilli(),
+ Code: consts.CollectSuccess,
+ Msg: "success",
+ Fields: make([]jobtypes.Field, 0),
+ Values: make([]jobtypes.ValueRow, 0),
+ }
+
+ // Initialize fields
+ for _, alias := range metrics.AliasFields {
+ response.Fields = append(response.Fields, jobtypes.Field{
+ Field: alias,
+ Type: 1,
+ })
+ }
+
+ redisConfig := metrics.Redis
+
+ // Parse timeout
+ timeout := 30 * time.Second
+ if redisConfig.Timeout != "" {
+ if t, err := strconv.Atoi(redisConfig.Timeout); err == nil {
+ timeout = time.Duration(t) * time.Millisecond
+ } else if d, err := time.ParseDuration(redisConfig.Timeout);
err == nil {
+ timeout = d
+ }
+ }
+
+ // Create dialer (handles SSH tunnel if configured)
+ dialer, err := rc.createRedisDialer(redisConfig.SSHTunnel)
+ if err != nil {
+ rc.logger.Error(err, "failed to create redis dialer")
+ response.Code = consts.CollectUnConnectable
+ response.Msg = err.Error()
+ return response
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ // Check if cluster mode
+ isCluster := redisConfig.Pattern == ClusterPattern ||
strings.HasPrefix(strings.ToLower(redisConfig.Pattern), ClusterInfo)
+
+ if isCluster {
+ rc.collectCluster(ctx, metrics, redisConfig, dialer, timeout,
response, startTime)
+ } else {
+ rc.collectSingle(ctx, metrics, redisConfig, dialer, timeout,
response, startTime)
+ }
+
+ return response
+}
+
+func (rc *RedisCollector) collectSingle(ctx context.Context, metrics
*jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context,
string, string) (net.Conn, error), timeout time.Duration, response
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+ opts := &redis.Options{
+ Addr: fmt.Sprintf("%s:%s", config.Host, config.Port),
+ Username: config.Username,
+ Password: config.Password,
+ DialTimeout: timeout,
+ ReadTimeout: timeout,
+ }
+ if dialer != nil {
+ opts.Dialer = dialer
+ }
+
+ client := redis.NewClient(opts)
+ defer client.Close()
+
+ info, err := rc.fetchRedisInfo(ctx, client, config.Pattern)
+ if err != nil {
+ rc.logger.Error(err, "failed to collect redis metrics")
+ response.Code = consts.CollectFail
+ response.Msg = err.Error()
+ return
+ }
+
+ responseTime := time.Since(startTime).Milliseconds()
+ parseMap := rc.parseInfo(info)
+ parseMap[strings.ToLower(ResponseTime)] = fmt.Sprintf("%d",
responseTime)
+ parseMap[strings.ToLower(Identity)] = fmt.Sprintf("%s:%s", config.Host,
config.Port)
+
+ rc.addValueRow(response, metrics.AliasFields, parseMap)
+}
+
+func (rc *RedisCollector) collectCluster(ctx context.Context, metrics
*jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context,
string, string) (net.Conn, error), timeout time.Duration, response
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+ opts := &redis.ClusterOptions{
+ Addrs: []string{fmt.Sprintf("%s:%s", config.Host,
config.Port)},
+ Username: config.Username,
+ Password: config.Password,
+ DialTimeout: timeout,
+ ReadTimeout: timeout,
+ }
+ if dialer != nil {
+ opts.Dialer = dialer
+ }
+
+ client := redis.NewClusterClient(opts)
+ defer client.Close()
+
+ // Collect from all masters
+ var wg sync.WaitGroup
+ var mu sync.Mutex
+
+ collectNode := func(nodeClient *redis.Client) {
+ defer wg.Done()
+ info, err := rc.fetchRedisInfo(ctx, nodeClient, config.Pattern)
+ if err != nil {
+ rc.logger.Error(err, "failed to collect redis cluster
node metrics", "addr", nodeClient.Options().Addr)
+ return
+ }
+
+ responseTime := time.Since(startTime).Milliseconds()
+ parseMap := rc.parseInfo(info)
+ parseMap[strings.ToLower(ResponseTime)] = fmt.Sprintf("%d",
responseTime)
+ parseMap[strings.ToLower(Identity)] = nodeClient.Options().Addr
+
+ // If collecting cluster info specifically
+ if metrics.Name == ClusterInfo {
+ clusterInfo, err := nodeClient.ClusterInfo(ctx).Result()
+ if err == nil {
+ clusterMap := rc.parseInfo(clusterInfo)
+ for k, v := range clusterMap {
+ parseMap[k] = v
+ }
+ }
+ }
+
+ mu.Lock()
+ rc.addValueRow(response, metrics.AliasFields, parseMap)
+ mu.Unlock()
+ }
+
+ // Iterate over masters
+ err := client.ForEachMaster(ctx, func(ctx context.Context, nodeClient
*redis.Client) error {
+ wg.Add(1)
+ go collectNode(nodeClient)
+ return nil
+ })
+ if err != nil {
+ rc.logger.Error(err, "failed to iterate masters")
+ }
+
+ // Iterate over slaves
+ err = client.ForEachSlave(ctx, func(ctx context.Context, nodeClient
*redis.Client) error {
+ wg.Add(1)
+ go collectNode(nodeClient)
+ return nil
+ })
+ if err != nil {
+ rc.logger.Error(err, "failed to iterate slaves")
+ }
+
+ wg.Wait()
+
+ if len(response.Values) == 0 {
+ response.Code = consts.CollectFail
+ response.Msg = "no data collected from cluster"
+ }
+}
+
+func (rc *RedisCollector) fetchRedisInfo(ctx context.Context, client
*redis.Client, pattern string) (string, error) {
+ if pattern != "" && strings.HasPrefix(strings.ToLower(pattern),
ClusterInfo) && pattern != ClusterPattern {
+ return client.ClusterInfo(ctx).Result()
+ }
+
+ section := pattern
+ if section == ClusterPattern {
+ section = "" // Default info for pattern "3"
+ }
+
+ if section == "" {
+ return client.Info(ctx).Result()
+ }
+ return client.Info(ctx, section).Result()
+}
+
+func (rc *RedisCollector) addValueRow(response
*jobtypes.CollectRepMetricsData, aliasFields []string, parseMap
map[string]string) {
+ valueRow := jobtypes.ValueRow{
+ Columns: make([]string, len(aliasFields)),
+ }
+
+ for i, alias := range aliasFields {
+ if val, ok := parseMap[strings.ToLower(alias)]; ok {
+ valueRow.Columns[i] = val
+ } else {
+ valueRow.Columns[i] = NullValue
+ }
+ }
+ response.Values = append(response.Values, valueRow)
+}
+
+// parseInfo parses Redis INFO command output into a map
+func (rc *RedisCollector) parseInfo(info string) map[string]string {
+ result := make(map[string]string)
+ lines := strings.Split(info, "\n")
+ for _, line := range lines {
+ line = strings.TrimSpace(line)
+ if line == "" || strings.HasPrefix(line, "#") {
+ continue
+ }
+ parts := strings.SplitN(line, ":", 2)
+ if len(parts) == 2 {
+ key := strings.ToLower(strings.TrimSpace(parts[0]))
+ value := strings.TrimSpace(parts[1])
+ result[key] = value
+ }
+ }
+ return result
+}
+
+// createRedisDialer creates a dialer function that supports SSH tunneling
+func (rc *RedisCollector) createRedisDialer(sshTunnel *jobtypes.SSHTunnel)
(func(context.Context, string, string) (net.Conn, error), error) {
+ if sshTunnel == nil || sshTunnel.Enable != "true" {
+ return nil, nil
+ }
+
+ // Create SSH config
+ sshConfig := &jobtypes.SSHProtocol{
+ Host: sshTunnel.Host,
+ Port: sshTunnel.Port,
+ Username: sshTunnel.Username,
+ Password: sshTunnel.Password,
+ }
+
+ // Use common/ssh helper to create client config
+ clientConfig, err := sshhelper.CreateSSHClientConfig(sshConfig,
rc.logger)
+ if err != nil {
+ return nil, err
+ }
+ clientConfig.Timeout = 30 * time.Second
+
+ return func(ctx context.Context, network, addr string) (net.Conn,
error) {
+ // Connect to SSH server
+ sshClient, err := sshhelper.DialWithContext(ctx, "tcp",
net.JoinHostPort(sshConfig.Host, sshConfig.Port), clientConfig)
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to ssh
server: %w", err)
+ }
+
+ // Dial target via SSH
+ conn, err := sshClient.Dial(network, addr)
+ if err != nil {
+ sshClient.Close()
+ return nil, fmt.Errorf("failed to dial target via ssh:
%w", err)
+ }
+
+ return &sshConnWrapper{Conn: conn, client: sshClient}, nil
+ }, nil
+}
+
+type sshConnWrapper struct {
+ net.Conn
+ client *ssh.Client
+}
+
+func (w *sshConnWrapper) Close() error {
+ err := w.Conn.Close()
+ if w.client != nil {
+ w.client.Close()
+ }
+ return err
+}
diff --git a/internal/collector/basic/redis/redis_collector_test.go
b/internal/collector/basic/redis/redis_collector_test.go
new file mode 100644
index 0000000..21c0720
--- /dev/null
+++ b/internal/collector/basic/redis/redis_collector_test.go
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package redis
+
+import (
+ "os"
+ "strings"
+ "testing"
+
+ "github.com/alicebob/miniredis/v2"
+ "github.com/stretchr/testify/assert"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+func TestRedisCollector_Protocol(t *testing.T) {
+ log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+ collector := NewRedisCollector(log)
+ assert.Equal(t, ProtocolRedis, collector.Protocol())
+}
+
+func TestRedisCollector_PreCheck(t *testing.T) {
+ log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+ collector := NewRedisCollector(log)
+
+ // Test nil metrics
+ err := collector.PreCheck(nil)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "redis configuration is required")
+
+ // Test missing Redis configuration
+ metrics := &jobtypes.Metrics{
+ Name: "redis_test",
+ }
+ err = collector.PreCheck(metrics)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "redis configuration is required")
+
+ // Test missing host
+ metrics.Redis = &jobtypes.RedisProtocol{
+ Port: "6379",
+ }
+ err = collector.PreCheck(metrics)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "redis host is required")
+
+ // Test missing port
+ metrics.Redis = &jobtypes.RedisProtocol{
+ Host: "localhost",
+ }
+ err = collector.PreCheck(metrics)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "redis port is required")
+
+ // Test valid configuration
+ metrics.Redis = &jobtypes.RedisProtocol{
+ Host: "localhost",
+ Port: "6379",
+ }
+ err = collector.PreCheck(metrics)
+ assert.NoError(t, err)
+}
+
+func TestRedisCollector_Collect(t *testing.T) {
+ // Start miniredis
+ s, err := miniredis.Run()
+ if err != nil {
+ t.Fatalf("Could not start miniredis: %s", err)
+ }
+ defer s.Close()
+
+ log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+ collector := NewRedisCollector(log)
+
+ // Parse host and port from miniredis address
+ addrParts := strings.Split(s.Addr(), ":")
+ host := addrParts[0]
+ port := addrParts[1]
+
+ metrics := &jobtypes.Metrics{
+ Name: "redis_test",
+ Redis: &jobtypes.RedisProtocol{
+ Host: host,
+ Port: port,
+ },
+ AliasFields: []string{"responseTime"},
+ }
+
+ // Execute Collect
+ result := collector.Collect(metrics)
+
+ // Verify result
+ assert.Equal(t, consts.CollectSuccess, result.Code)
+ assert.Equal(t, "success", result.Msg)
+ assert.NotEmpty(t, result.Values)
+ assert.Equal(t, 1, len(result.Values))
+
+ // Verify collected values
+ values := result.Values[0].Columns
+ assert.Equal(t, 1, len(values))
+
+ if values[0] == NullValue {
+ t.Logf("Failed to get responseTime. Value is NullValue.")
+ }
+
+ // responseTime should be present
+ assert.NotEqual(t, NullValue, values[0])
+}
+
+func TestRedisCollector_Collect_Auth(t *testing.T) {
+ // Start miniredis with auth
+ s, err := miniredis.Run()
+ if err != nil {
+ t.Fatalf("Could not start miniredis: %s", err)
+ }
+ defer s.Close()
+ s.RequireAuth("password123")
+
+ log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+ collector := NewRedisCollector(log)
+
+ addrParts := strings.Split(s.Addr(), ":")
+ host := addrParts[0]
+ port := addrParts[1]
+
+ // Test with correct password
+ metrics := &jobtypes.Metrics{
+ Name: "redis_auth_test",
+ Redis: &jobtypes.RedisProtocol{
+ Host: host,
+ Port: port,
+ Password: "password123",
+ },
+ AliasFields: []string{"redis_version"},
+ }
+
+ result := collector.Collect(metrics)
+ assert.Equal(t, consts.CollectSuccess, result.Code)
+
+ // Test with incorrect password
+ metrics.Redis.Password = "wrong_password"
+ result = collector.Collect(metrics)
+ assert.Equal(t, consts.CollectFail, result.Code)
+}
+
+func TestRedisCollector_ParseInfo(t *testing.T) {
+ log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+ collector := NewRedisCollector(log)
+
+ info := `# Server
+redis_version:6.2.6
+redis_git_sha1:00000000
+redis_git_dirty:0
+redis_build_id:c6f3693d1ac7923d
+redis_mode:standalone
+os:Linux 5.10.76-linuxkit x86_64
+arch_bits:64
+multiplexing_api:epoll
+atomicvar_api:atomic-builtin
+gcc_version:10.2.1
+process_id:1
+process_supervised:no
+run_id:d55354296781d79907a72527854463a2636629d6
+tcp_port:6379
+server_time_usec:1645518888888888
+uptime_in_seconds:3600
+uptime_in_days:1
+
+# Clients
+connected_clients:1
+cluster_enabled:0
+maxmemory:0`
+
+ parsed := collector.parseInfo(info)
+
+ assert.Equal(t, "6.2.6", parsed["redis_version"])
+ assert.Equal(t, "standalone", parsed["redis_mode"])
+ assert.Equal(t, "1", parsed["connected_clients"])
+ assert.Equal(t, "0", parsed["cluster_enabled"])
+ assert.Equal(t, "1", parsed["uptime_in_days"])
+}
diff --git a/internal/collector/common/transport/transport.go
b/internal/collector/common/transport/transport.go
index 44058e3..f2edf73 100644
--- a/internal/collector/common/transport/transport.go
+++ b/internal/collector/common/transport/transport.go
@@ -18,260 +18,260 @@
package transport
import (
- "context"
- "encoding/json"
- "fmt"
- "os"
-
- pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
- clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
- configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
- config
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+
+ pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+ clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
+ configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
const (
- // DefaultManagerAddr is the default manager server address (Java Netty
default port)
- DefaultManagerAddr = "127.0.0.1:1158"
- // DefaultProtocol is the default communication protocol for Java
compatibility
- DefaultProtocol = "netty"
- // DefaultMode is the default operation mode
- DefaultMode = "public"
- // DefaultIdentity is the default collector identity
- DefaultIdentity = "collector-go"
+ // DefaultManagerAddr is the default manager server address (Java Netty
default port)
+ DefaultManagerAddr = "127.0.0.1:1158"
+ // DefaultProtocol is the default communication protocol for Java
compatibility
+ DefaultProtocol = "netty"
+ // DefaultMode is the default operation mode
+ DefaultMode = "public"
+ // DefaultIdentity is the default collector identity
+ DefaultIdentity = "collector-go"
)
type Runner struct {
- Config *configtypes.CollectorConfig
- client transport.TransportClient
- jobScheduler transport.JobScheduler
- clrserver.Server
+ Config *configtypes.CollectorConfig
+ client transport.TransportClient
+ jobScheduler transport.JobScheduler
+ clrserver.Server
}
func New(cfg *configtypes.CollectorConfig) *Runner {
- return &Runner{
- Config: cfg,
- Server: clrserver.Server{
- Logger: logger.Logger{}, // Will be initialized
properly in Start method
- },
- }
+ return &Runner{
+ Config: cfg,
+ Server: clrserver.Server{
+ Logger: logger.Logger{}, // Will be initialized properly in Start method
+ },
+ }
}
// SetJobScheduler sets the job scheduler for the transport runner
func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) {
- r.jobScheduler = scheduler
+ r.jobScheduler = scheduler
}
// NewFromConfig creates a new transport runner from collector configuration
func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
- if cfg == nil {
- return nil
- }
- return New(cfg)
+ if cfg == nil {
+ return nil
+ }
+ return New(cfg)
}
// NewFromEnv creates a new transport runner from environment variables
func NewFromEnv() *Runner {
- envLoader := config.NewEnvConfigLoader()
- cfg := envLoader.LoadFromEnv()
- return NewFromConfig(cfg)
+ envLoader := config.NewEnvConfigLoader()
+ cfg := envLoader.LoadFromEnv()
+ return NewFromConfig(cfg)
}
// NewFromUnifiedConfig creates a new transport runner using unified
configuration loading
// It loads from file first, then overrides with environment variables
func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
- unifiedLoader := config.NewUnifiedConfigLoader(cfgPath)
- cfg, err := unifiedLoader.Load()
- if err != nil {
- return nil, err
- }
- return NewFromConfig(cfg), nil
+ unifiedLoader := config.NewUnifiedConfigLoader(cfgPath)
+ cfg, err := unifiedLoader.Load()
+ if err != nil {
+ return nil, err
+ }
+ return NewFromConfig(cfg), nil
}
func (r *Runner) Start(ctx context.Context) error {
- // 初始化 Logger 如果它还没有被设置
- if r.Logger.IsZero() {
- r.Logger = logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo)
- }
- r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner",
r.Info().Name)
- r.Logger.Info("Starting transport client")
-
- // 构建 server 地址
- addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host,
r.Config.Collector.Manager.Port)
- if addr == ":" {
- // 如果配置为空,使用环境变量或默认值
- if v := os.Getenv("MANAGER_ADDR"); v != "" {
- addr = v
- } else {
- addr = DefaultManagerAddr
- }
- }
-
- // 确定协议
- protocol := r.Config.Collector.Manager.Protocol
- if protocol == "" {
- if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
- protocol = v
- } else {
- protocol = DefaultProtocol
- }
- }
-
- r.Logger.Info("Connecting to manager server", "addr", addr, "protocol",
protocol)
-
- // 创建客户端
- factory := &transport.TransportClientFactory{}
- client, err := factory.CreateClient(protocol, addr)
- if err != nil {
- r.Logger.Error(err, "Failed to create transport client")
- return err
- }
-
- // Set the identity on the client if it supports it
- identity := r.Config.Collector.Identity
- if identity == "" {
- identity = DefaultIdentity
- }
-
- if nettyClient, ok := client.(*transport.NettyClient); ok {
- nettyClient.SetIdentity(identity)
- }
-
- r.client = client
-
- // 设置事件处理器
- switch c := client.(type) {
- case *transport.GrpcClient:
- c.SetEventHandler(func(event transport.Event) {
- switch event.Type {
- case transport.EventConnected:
- r.Logger.Info("Connected to manager gRPC
server", "addr", event.Address)
- go r.sendOnlineMessage()
- case transport.EventDisconnected:
- r.Logger.Info("Disconnected from manager gRPC
server", "addr", event.Address)
- case transport.EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to connect
to manager gRPC server", "addr", event.Address)
- }
- })
- // Register processors with job scheduler
- if r.jobScheduler != nil {
- transport.RegisterDefaultProcessors(c, r.jobScheduler)
- r.Logger.Info("Registered gRPC processors with job
scheduler")
- } else {
- transport.RegisterDefaultProcessors(c, nil)
- r.Logger.Info("Registered gRPC processors without job
scheduler")
- }
- case *transport.NettyClient:
- c.SetEventHandler(func(event transport.Event) {
- switch event.Type {
- case transport.EventConnected:
- r.Logger.Info("Connected to manager netty
server", "addr", event.Address)
- go r.sendOnlineMessage()
- case transport.EventDisconnected:
- r.Logger.Info("Disconnected from manager netty
server", "addr", event.Address)
- case transport.EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to connect
to manager netty server", "addr", event.Address)
- }
- })
- // Register processors with job scheduler
- if r.jobScheduler != nil {
- transport.RegisterDefaultNettyProcessors(c,
r.jobScheduler)
- r.Logger.Info("Registered netty processors with job
scheduler")
- } else {
- transport.RegisterDefaultNettyProcessors(c, nil)
- r.Logger.Info("Registered netty processors without job
scheduler")
- }
- }
-
- if err := r.client.Start(); err != nil {
- r.Logger.Error(err, "Failed to start transport client")
- return err
- }
-
- // 创建新的context用于监控关闭信号
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
-
- // 监听 ctx.Done 优雅关闭
- go func() {
- <-ctx.Done()
- r.Logger.Info("Shutting down transport client...")
- _ = r.client.Shutdown()
- }()
-
- // 阻塞直到 ctx.Done
- <-ctx.Done()
- return nil
+ // 初始化 Logger 如果它还没有被设置
+ if r.Logger.IsZero() {
+ r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
+ }
+ r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner",
r.Info().Name)
+ r.Logger.Info("Starting transport client")
+
+ // 构建 server 地址
+ addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host,
r.Config.Collector.Manager.Port)
+ if addr == ":" {
+ // 如果配置为空,使用环境变量或默认值
+ if v := os.Getenv("MANAGER_ADDR"); v != "" {
+ addr = v
+ } else {
+ addr = DefaultManagerAddr
+ }
+ }
+
+ // 确定协议
+ protocol := r.Config.Collector.Manager.Protocol
+ if protocol == "" {
+ if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
+ protocol = v
+ } else {
+ protocol = DefaultProtocol
+ }
+ }
+
+ r.Logger.Info("Connecting to manager server", "addr", addr, "protocol",
protocol)
+
+ // 创建客户端
+ factory := &transport.TransportClientFactory{}
+ client, err := factory.CreateClient(protocol, addr)
+ if err != nil {
+ r.Logger.Error(err, "Failed to create transport client")
+ return err
+ }
+
+ // Set the identity on the client if it supports it
+ identity := r.Config.Collector.Identity
+ if identity == "" {
+ identity = DefaultIdentity
+ }
+
+ if nettyClient, ok := client.(*transport.NettyClient); ok {
+ nettyClient.SetIdentity(identity)
+ }
+
+ r.client = client
+
+ // 设置事件处理器
+ switch c := client.(type) {
+ case *transport.GrpcClient:
+ c.SetEventHandler(func(event transport.Event) {
+ switch event.Type {
+ case transport.EventConnected:
+ r.Logger.Info("Connected to manager gRPC server", "addr",
event.Address)
+ go r.sendOnlineMessage()
+ case transport.EventDisconnected:
+ r.Logger.Info("Disconnected from manager gRPC server", "addr",
event.Address)
+ case transport.EventConnectFailed:
+ r.Logger.Error(event.Error, "Failed to connect to manager gRPC
server", "addr", event.Address)
+ }
+ })
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ transport.RegisterDefaultProcessors(c, r.jobScheduler)
+ r.Logger.Info("Registered gRPC processors with job scheduler")
+ } else {
+ transport.RegisterDefaultProcessors(c, nil)
+ r.Logger.Info("Registered gRPC processors without job scheduler")
+ }
+ case *transport.NettyClient:
+ c.SetEventHandler(func(event transport.Event) {
+ switch event.Type {
+ case transport.EventConnected:
+ r.Logger.Info("Connected to manager netty server", "addr",
event.Address)
+ go r.sendOnlineMessage()
+ case transport.EventDisconnected:
+ r.Logger.Info("Disconnected from manager netty server", "addr",
event.Address)
+ case transport.EventConnectFailed:
+ r.Logger.Error(event.Error, "Failed to connect to manager netty
server", "addr", event.Address)
+ }
+ })
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ transport.RegisterDefaultNettyProcessors(c, r.jobScheduler)
+ r.Logger.Info("Registered netty processors with job scheduler")
+ } else {
+ transport.RegisterDefaultNettyProcessors(c, nil)
+ r.Logger.Info("Registered netty processors without job scheduler")
+ }
+ }
+
+ if err := r.client.Start(); err != nil {
+ r.Logger.Error(err, "Failed to start transport client")
+ return err
+ }
+
+ // 创建新的context用于监控关闭信号
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // 监听 ctx.Done 优雅关闭
+ go func() {
+ <-ctx.Done()
+ r.Logger.Info("Shutting down transport client...")
+ _ = r.client.Shutdown()
+ }()
+
+ // 阻塞直到 ctx.Done
+ <-ctx.Done()
+ return nil
}
func (r *Runner) sendOnlineMessage() {
- if r.client != nil && r.client.IsStarted() {
- // Use the configured identity
- identity := r.Config.Collector.Identity
- if identity == "" {
- identity = DefaultIdentity
- }
-
- // Create CollectorInfo JSON structure as expected by Java
server
- mode := r.Config.Collector.Mode
- if mode == "" {
- mode = DefaultMode // Default mode as in Java version
- }
-
- collectorInfo := map[string]interface{}{
- "name": identity,
- "ip": "", // Let server detect IP
- "version": "1.0.0",
- "mode": mode,
- }
-
- // Convert to JSON bytes
- jsonData, err := json.Marshal(collectorInfo)
- if err != nil {
- r.Logger.Error(err, "Failed to marshal collector info
to JSON")
- return
- }
-
- onlineMsg := &pb.Message{
- Type: pb.MessageType_GO_ONLINE,
- Direction: pb.Direction_REQUEST,
- Identity: identity,
- Msg: jsonData,
- }
-
- r.Logger.Info("Sending online message", "identity", identity,
"type", onlineMsg.Type)
-
- if err := r.client.SendMsg(onlineMsg); err != nil {
- r.Logger.Error(err, "Failed to send online message",
"identity", identity)
- } else {
- r.Logger.Info("Online message sent successfully",
"identity", identity)
- }
- }
+ if r.client != nil && r.client.IsStarted() {
+ // Use the configured identity
+ identity := r.Config.Collector.Identity
+ if identity == "" {
+ identity = DefaultIdentity
+ }
+
+ // Create CollectorInfo JSON structure as expected by Java server
+ mode := r.Config.Collector.Mode
+ if mode == "" {
+ mode = DefaultMode // Default mode as in Java version
+ }
+
+ collectorInfo := map[string]interface{}{
+ "name": identity,
+ "ip": "", // Let server detect IP
+ "version": "1.0.0",
+ "mode": mode,
+ }
+
+ // Convert to JSON bytes
+ jsonData, err := json.Marshal(collectorInfo)
+ if err != nil {
+ r.Logger.Error(err, "Failed to marshal collector info to JSON")
+ return
+ }
+
+ onlineMsg := &pb.Message{
+ Type: pb.MessageType_GO_ONLINE,
+ Direction: pb.Direction_REQUEST,
+ Identity: identity,
+ Msg: jsonData,
+ }
+
+ r.Logger.Info("Sending online message", "identity", identity, "type",
onlineMsg.Type)
+
+ if err := r.client.SendMsg(onlineMsg); err != nil {
+ r.Logger.Error(err, "Failed to send online message", "identity",
identity)
+ } else {
+ r.Logger.Info("Online message sent successfully", "identity", identity)
+ }
+ }
}
func (r *Runner) Info() collector.Info {
- return collector.Info{
- Name: "transport",
- }
+ return collector.Info{
+ Name: "transport",
+ }
}
func (r *Runner) Close() error {
- r.Logger.Info("transport close...")
- if r.client != nil {
- _ = r.client.Shutdown()
- }
- return nil
+ r.Logger.Info("transport close...")
+ if r.client != nil {
+ _ = r.client.Shutdown()
+ }
+ return nil
}
// GetClient returns the transport client (for testing and advanced usage)
func (r *Runner) GetClient() transport.TransportClient {
- return r.client
+ return r.client
}
// IsConnected returns whether the client is connected and started
func (r *Runner) IsConnected() bool {
- return r.client != nil && r.client.IsStarted()
+ return r.client != nil && r.client.IsStarted()
}
diff --git a/internal/collector/common/types/job/metrics_types.go
b/internal/collector/common/types/job/metrics_types.go
index 293811a..b783c21 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -259,11 +259,13 @@ type JMXProtocol struct {
// RedisProtocol represents Redis protocol configuration
type RedisProtocol struct {
- Host string `json:"host"`
- Port int `json:"port"`
- Password string `json:"password"`
- Pattern string `json:"pattern"`
- Timeout int `json:"timeout"`
+ Host string `json:"host"`
+ Port string `json:"port"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ Pattern string `json:"pattern"`
+ Timeout string `json:"timeout"`
+ SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"`
}
// MongoDBProtocol represents MongoDB protocol configuration
diff --git a/internal/util/param/param_replacer.go
b/internal/util/param/param_replacer.go
index 5e95eed..0aeec7d 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -148,6 +148,11 @@ func (r *Replacer) ReplaceMetricsParams(metrics
*jobtypes.Metrics, paramMap map[
r.replaceHTTPParams(metrics.HTTP, paramMap)
}
+ // 4. Redis
+ if metrics.Redis != nil {
+ r.replaceRedisParams(metrics.Redis, paramMap)
+ }
+
// Replace parameters in basic metrics fields
if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
return fmt.Errorf("failed to replace basic metrics params: %w",
err)
@@ -204,6 +209,22 @@ func (r *Replacer) replaceHTTPParams(http
*jobtypes.HTTPProtocol, paramMap map[s
}
}
+// replaceRedisParams specific replacement logic for RedisProtocol struct
+func (r *Replacer) replaceRedisParams(redis *jobtypes.RedisProtocol, paramMap
map[string]string) {
+ redis.Host = r.replaceParamPlaceholders(redis.Host, paramMap)
+ redis.Port = r.replaceParamPlaceholders(redis.Port, paramMap)
+ redis.Username = r.replaceParamPlaceholders(redis.Username, paramMap)
+ redis.Password = r.replaceParamPlaceholders(redis.Password, paramMap)
+ redis.Pattern = r.replaceParamPlaceholders(redis.Pattern, paramMap)
+ redis.Timeout = r.replaceParamPlaceholders(redis.Timeout, paramMap)
+ if redis.SSHTunnel != nil {
+ redis.SSHTunnel.Host =
r.replaceParamPlaceholders(redis.SSHTunnel.Host, paramMap)
+ redis.SSHTunnel.Port =
r.replaceParamPlaceholders(redis.SSHTunnel.Port, paramMap)
+ redis.SSHTunnel.Username =
r.replaceParamPlaceholders(redis.SSHTunnel.Username, paramMap)
+ redis.SSHTunnel.Password =
r.replaceParamPlaceholders(redis.SSHTunnel.Password, paramMap)
+ }
+}
+
// replaceProtocolParams replaces parameters in any protocol configuration
defined as interface{}
func (r *Replacer) replaceProtocolParams(protocolInterface *interface{},
paramMap map[string]string) error {
if *protocolInterface == nil {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]