This is an automated email from the ASF dual-hosted git repository.

shown pushed a commit to branch 1130-yuluo/opt
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit c7e6128252fde929ebe301ce61cf109e7523fe6a
Author: yuluo-yx <[email protected]>
AuthorDate: Sun Nov 30 15:00:18 2025 +0800

    feat: add redis collector impl
    
    Signed-off-by: yuluo-yx <[email protected]>
---
 go.mod                                             |   6 +-
 go.sum                                             |  12 +
 internal/collector/basic/init.go                   |   8 +-
 internal/collector/basic/jmx/.keep                 |   0
 internal/collector/basic/redis/.keep               |   0
 internal/collector/basic/redis/redis_collector.go  | 345 +++++++++++++++++
 .../collector/basic/redis/redis_collector_test.go  | 200 ++++++++++
 internal/collector/common/transport/transport.go   | 420 ++++++++++-----------
 .../collector/common/types/job/metrics_types.go    |  12 +-
 internal/util/param/param_replacer.go              |  21 ++
 10 files changed, 805 insertions(+), 219 deletions(-)

diff --git a/go.mod b/go.mod
index b924b9a..0891533 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module hertzbeat.apache.org/hertzbeat-collector-go
 go 1.24.6
 
 require (
+       github.com/alicebob/miniredis/v2 v2.35.0
        github.com/apache/arrow/go/v13 v13.0.0
        github.com/expr-lang/expr v1.16.9
        github.com/go-logr/logr v1.4.3
@@ -11,6 +12,8 @@ require (
        github.com/lib/pq v1.10.9
        github.com/microsoft/go-mssqldb v1.9.3
        github.com/prometheus/client_golang v1.23.0
+       github.com/prometheus/common v0.65.0
+       github.com/redis/go-redis/v9 v9.17.1
        github.com/sijms/go-ora/v2 v2.9.0
        github.com/spf13/cobra v1.10.1
        github.com/stretchr/testify v1.10.0
@@ -26,6 +29,7 @@ require (
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/cespare/xxhash/v2 v2.3.0 // indirect
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // 
indirect
+       github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // 
indirect
        github.com/goccy/go-json v0.10.0 // indirect
        github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // 
indirect
        github.com/golang-sql/sqlexp v0.1.0 // indirect
@@ -38,10 +42,10 @@ require (
        github.com/pierrec/lz4/v4 v4.1.17 // indirect
        github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // 
indirect
        github.com/prometheus/client_model v0.6.2 // indirect
-       github.com/prometheus/common v0.65.0 // indirect
        github.com/prometheus/procfs v0.16.1 // indirect
        github.com/rogpeppe/go-internal v1.13.1 // indirect
        github.com/spf13/pflag v1.0.9 // indirect
+       github.com/yuin/gopher-lua v1.1.1 // indirect
        github.com/zeebo/xxh3 v1.0.2 // indirect
        go.uber.org/multierr v1.11.0 // indirect
        golang.org/x/mod v0.25.0 // indirect
diff --git a/go.sum b/go.sum
index b54c994..e28774c 100644
--- a/go.sum
+++ b/go.sum
@@ -12,15 +12,23 @@ 
github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1 h1:bFWuo
 github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1/go.mod 
h1:Vih/3yc6yac2JzU4hzpaDupBJP0Flaia9rXXrU8xyww=
 github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 
h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs=
 github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod 
h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
+github.com/alicebob/miniredis/v2 v2.35.0 
h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI=
+github.com/alicebob/miniredis/v2 v2.35.0/go.mod 
h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
 github.com/apache/arrow/go/v13 v13.0.0 
h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk=
 github.com/apache/arrow/go/v13 v13.0.0/go.mod 
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/bsm/ginkgo/v2 v2.12.0 
h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
+github.com/bsm/ginkgo/v2 v2.12.0/go.mod 
h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
+github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
+github.com/bsm/gomega v1.27.10/go.mod 
h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
 github.com/cespare/xxhash/v2 v2.3.0 
h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
 github.com/cespare/xxhash/v2 v2.3.0/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod 
h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc 
h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f 
h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod 
h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/expr-lang/expr v1.16.9 
h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=
 github.com/expr-lang/expr v1.16.9/go.mod 
h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
 github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
@@ -79,6 +87,8 @@ github.com/prometheus/common v0.65.0 
h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2
 github.com/prometheus/common v0.65.0/go.mod 
h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
 github.com/prometheus/procfs v0.16.1 
h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
 github.com/prometheus/procfs v0.16.1/go.mod 
h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
+github.com/redis/go-redis/v9 v9.17.1 
h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs=
+github.com/redis/go-redis/v9 v9.17.1/go.mod 
h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
 github.com/rogpeppe/go-internal v1.13.1 
h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
 github.com/rogpeppe/go-internal v1.13.1/go.mod 
h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod 
h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -90,6 +100,8 @@ github.com/spf13/pflag v1.0.9 
h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY=
 github.com/spf13/pflag v1.0.9/go.mod 
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
 github.com/stretchr/testify v1.10.0 
h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
 github.com/stretchr/testify v1.10.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/yuin/gopher-lua v1.1.1 
h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
+github.com/yuin/gopher-lua v1.1.1/go.mod 
h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
 github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
 github.com/zeebo/assert v1.3.0/go.mod 
h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
 github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index de47b58..3145b48 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -22,6 +22,7 @@ package basic
 import (
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh"
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
@@ -48,10 +49,11 @@ func init() {
                return http.NewHTTPCollector(logger)
        })
 
+       strategy.RegisterFactory("redis", func(logger logger.Logger) 
strategy.Collector {
+               return redis.NewRedisCollector(logger)
+       })
+
        // More protocols can be added here in the future:
-       // strategy.RegisterFactory("redis", func(logger logger.Logger) 
strategy.Collector {
-       //     return redis.NewRedisCollector(logger)
-       // })
 }
 
 // InitializeAllCollectors initializes all registered collectors
diff --git a/internal/collector/basic/jmx/.keep 
b/internal/collector/basic/jmx/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/basic/redis/.keep 
b/internal/collector/basic/redis/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/basic/redis/redis_collector.go 
b/internal/collector/basic/redis/redis_collector.go
new file mode 100644
index 0000000..6341815
--- /dev/null
+++ b/internal/collector/basic/redis/redis_collector.go
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package redis
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/redis/go-redis/v9"
+       "golang.org/x/crypto/ssh"
+
+       sshhelper 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+const (
+       ProtocolRedis  = "redis"
+       ResponseTime   = "responseTime"
+       NullValue      = consts.NULL_VALUE
+       ClusterPattern = "3"
+       ClusterInfo    = "cluster"
+       Identity       = "identity"
+)
+
+// RedisCollector implements Redis metrics collection
+type RedisCollector struct {
+       logger logger.Logger
+}
+
+// NewRedisCollector creates a new Redis collector
+func NewRedisCollector(logger logger.Logger) *RedisCollector {
+       return &RedisCollector{
+               logger: logger.WithName("redis-collector"),
+       }
+}
+
+// Protocol returns the protocol this collector supports
+func (rc *RedisCollector) Protocol() string {
+       return ProtocolRedis
+}
+
+// PreCheck validates the Redis metrics configuration
+func (rc *RedisCollector) PreCheck(metrics *jobtypes.Metrics) error {
+       if metrics == nil || metrics.Redis == nil {
+               return fmt.Errorf("redis configuration is required")
+       }
+       if metrics.Redis.Host == "" {
+               return fmt.Errorf("redis host is required")
+       }
+       if metrics.Redis.Port == "" {
+               return fmt.Errorf("redis port is required")
+       }
+       return nil
+}
+
+// Collect performs Redis metrics collection
+func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
+       startTime := time.Now()
+
+       // Create response structure
+       response := &jobtypes.CollectRepMetricsData{
+               Metrics: metrics.Name,
+               Time:    startTime.UnixMilli(),
+               Code:    consts.CollectSuccess,
+               Msg:     "success",
+               Fields:  make([]jobtypes.Field, 0),
+               Values:  make([]jobtypes.ValueRow, 0),
+       }
+
+       // Initialize fields
+       for _, alias := range metrics.AliasFields {
+               response.Fields = append(response.Fields, jobtypes.Field{
+                       Field: alias,
+                       Type:  1,
+               })
+       }
+
+       redisConfig := metrics.Redis
+
+       // Parse timeout
+       timeout := 30 * time.Second
+       if redisConfig.Timeout != "" {
+               if t, err := strconv.Atoi(redisConfig.Timeout); err == nil {
+                       timeout = time.Duration(t) * time.Millisecond
+               } else if d, err := time.ParseDuration(redisConfig.Timeout); 
err == nil {
+                       timeout = d
+               }
+       }
+
+       // Create dialer (handles SSH tunnel if configured)
+       dialer, err := rc.createRedisDialer(redisConfig.SSHTunnel)
+       if err != nil {
+               rc.logger.Error(err, "failed to create redis dialer")
+               response.Code = consts.CollectUnConnectable
+               response.Msg = err.Error()
+               return response
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), timeout)
+       defer cancel()
+
+       // Check if cluster mode
+       isCluster := redisConfig.Pattern == ClusterPattern || 
strings.HasPrefix(strings.ToLower(redisConfig.Pattern), ClusterInfo)
+
+       if isCluster {
+               rc.collectCluster(ctx, metrics, redisConfig, dialer, timeout, 
response, startTime)
+       } else {
+               rc.collectSingle(ctx, metrics, redisConfig, dialer, timeout, 
response, startTime)
+       }
+
+       return response
+}
+
+func (rc *RedisCollector) collectSingle(ctx context.Context, metrics 
*jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, 
string, string) (net.Conn, error), timeout time.Duration, response 
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+       opts := &redis.Options{
+               Addr:        fmt.Sprintf("%s:%s", config.Host, config.Port),
+               Username:    config.Username,
+               Password:    config.Password,
+               DialTimeout: timeout,
+               ReadTimeout: timeout,
+       }
+       if dialer != nil {
+               opts.Dialer = dialer
+       }
+
+       client := redis.NewClient(opts)
+       defer client.Close()
+
+       info, err := rc.fetchRedisInfo(ctx, client, config.Pattern)
+       if err != nil {
+               rc.logger.Error(err, "failed to collect redis metrics")
+               response.Code = consts.CollectFail
+               response.Msg = err.Error()
+               return
+       }
+
+       responseTime := time.Since(startTime).Milliseconds()
+       parseMap := rc.parseInfo(info)
+       parseMap[strings.ToLower(ResponseTime)] = fmt.Sprintf("%d", 
responseTime)
+       parseMap[strings.ToLower(Identity)] = fmt.Sprintf("%s:%s", config.Host, 
config.Port)
+
+       rc.addValueRow(response, metrics.AliasFields, parseMap)
+}
+
+func (rc *RedisCollector) collectCluster(ctx context.Context, metrics 
*jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, 
string, string) (net.Conn, error), timeout time.Duration, response 
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+       opts := &redis.ClusterOptions{
+               Addrs:       []string{fmt.Sprintf("%s:%s", config.Host, 
config.Port)},
+               Username:    config.Username,
+               Password:    config.Password,
+               DialTimeout: timeout,
+               ReadTimeout: timeout,
+       }
+       if dialer != nil {
+               opts.Dialer = dialer
+       }
+
+       client := redis.NewClusterClient(opts)
+       defer client.Close()
+
+       // Collect from all masters
+       var wg sync.WaitGroup
+       var mu sync.Mutex
+
+       collectNode := func(nodeClient *redis.Client) {
+               defer wg.Done()
+               info, err := rc.fetchRedisInfo(ctx, nodeClient, config.Pattern)
+               if err != nil {
+                       rc.logger.Error(err, "failed to collect redis cluster 
node metrics", "addr", nodeClient.Options().Addr)
+                       return
+               }
+
+               responseTime := time.Since(startTime).Milliseconds()
+               parseMap := rc.parseInfo(info)
+               parseMap[strings.ToLower(ResponseTime)] = fmt.Sprintf("%d", 
responseTime)
+               parseMap[strings.ToLower(Identity)] = nodeClient.Options().Addr
+
+               // If collecting cluster info specifically
+               if metrics.Name == ClusterInfo {
+                       clusterInfo, err := nodeClient.ClusterInfo(ctx).Result()
+                       if err == nil {
+                               clusterMap := rc.parseInfo(clusterInfo)
+                               for k, v := range clusterMap {
+                                       parseMap[k] = v
+                               }
+                       }
+               }
+
+               mu.Lock()
+               rc.addValueRow(response, metrics.AliasFields, parseMap)
+               mu.Unlock()
+       }
+
+       // Iterate over masters
+       err := client.ForEachMaster(ctx, func(ctx context.Context, nodeClient 
*redis.Client) error {
+               wg.Add(1)
+               go collectNode(nodeClient)
+               return nil
+       })
+       if err != nil {
+               rc.logger.Error(err, "failed to iterate masters")
+       }
+
+       // Iterate over slaves
+       err = client.ForEachSlave(ctx, func(ctx context.Context, nodeClient 
*redis.Client) error {
+               wg.Add(1)
+               go collectNode(nodeClient)
+               return nil
+       })
+       if err != nil {
+               rc.logger.Error(err, "failed to iterate slaves")
+       }
+
+       wg.Wait()
+
+       if len(response.Values) == 0 {
+               response.Code = consts.CollectFail
+               response.Msg = "no data collected from cluster"
+       }
+}
+
+func (rc *RedisCollector) fetchRedisInfo(ctx context.Context, client 
*redis.Client, pattern string) (string, error) {
+       if pattern != "" && strings.HasPrefix(strings.ToLower(pattern), 
ClusterInfo) && pattern != ClusterPattern {
+               return client.ClusterInfo(ctx).Result()
+       }
+
+       section := pattern
+       if section == ClusterPattern {
+               section = "" // Default info for pattern "3"
+       }
+
+       if section == "" {
+               return client.Info(ctx).Result()
+       }
+       return client.Info(ctx, section).Result()
+}
+
+func (rc *RedisCollector) addValueRow(response 
*jobtypes.CollectRepMetricsData, aliasFields []string, parseMap 
map[string]string) {
+       valueRow := jobtypes.ValueRow{
+               Columns: make([]string, len(aliasFields)),
+       }
+
+       for i, alias := range aliasFields {
+               if val, ok := parseMap[strings.ToLower(alias)]; ok {
+                       valueRow.Columns[i] = val
+               } else {
+                       valueRow.Columns[i] = NullValue
+               }
+       }
+       response.Values = append(response.Values, valueRow)
+}
+
+// parseInfo parses Redis INFO command output into a map
+func (rc *RedisCollector) parseInfo(info string) map[string]string {
+       result := make(map[string]string)
+       lines := strings.Split(info, "\n")
+       for _, line := range lines {
+               line = strings.TrimSpace(line)
+               if line == "" || strings.HasPrefix(line, "#") {
+                       continue
+               }
+               parts := strings.SplitN(line, ":", 2)
+               if len(parts) == 2 {
+                       key := strings.ToLower(strings.TrimSpace(parts[0]))
+                       value := strings.TrimSpace(parts[1])
+                       result[key] = value
+               }
+       }
+       return result
+}
+
+// createRedisDialer creates a dialer function that supports SSH tunneling
+func (rc *RedisCollector) createRedisDialer(sshTunnel *jobtypes.SSHTunnel) 
(func(context.Context, string, string) (net.Conn, error), error) {
+       if sshTunnel == nil || sshTunnel.Enable != "true" {
+               return nil, nil
+       }
+
+       // Create SSH config
+       sshConfig := &jobtypes.SSHProtocol{
+               Host:     sshTunnel.Host,
+               Port:     sshTunnel.Port,
+               Username: sshTunnel.Username,
+               Password: sshTunnel.Password,
+       }
+
+       // Use common/ssh helper to create client config
+       clientConfig, err := sshhelper.CreateSSHClientConfig(sshConfig, 
rc.logger)
+       if err != nil {
+               return nil, err
+       }
+       clientConfig.Timeout = 30 * time.Second
+
+       return func(ctx context.Context, network, addr string) (net.Conn, 
error) {
+               // Connect to SSH server
+               sshClient, err := sshhelper.DialWithContext(ctx, "tcp", 
net.JoinHostPort(sshConfig.Host, sshConfig.Port), clientConfig)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to connect to ssh 
server: %w", err)
+               }
+
+               // Dial target via SSH
+               conn, err := sshClient.Dial(network, addr)
+               if err != nil {
+                       sshClient.Close()
+                       return nil, fmt.Errorf("failed to dial target via ssh: 
%w", err)
+               }
+
+               return &sshConnWrapper{Conn: conn, client: sshClient}, nil
+       }, nil
+}
+
+type sshConnWrapper struct {
+       net.Conn
+       client *ssh.Client
+}
+
+func (w *sshConnWrapper) Close() error {
+       err := w.Conn.Close()
+       if w.client != nil {
+               w.client.Close()
+       }
+       return err
+}
diff --git a/internal/collector/basic/redis/redis_collector_test.go 
b/internal/collector/basic/redis/redis_collector_test.go
new file mode 100644
index 0000000..21c0720
--- /dev/null
+++ b/internal/collector/basic/redis/redis_collector_test.go
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package redis
+
+import (
+       "os"
+       "strings"
+       "testing"
+
+       "github.com/alicebob/miniredis/v2"
+       "github.com/stretchr/testify/assert"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+func TestRedisCollector_Protocol(t *testing.T) {
+       log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+       collector := NewRedisCollector(log)
+       assert.Equal(t, ProtocolRedis, collector.Protocol())
+}
+
+func TestRedisCollector_PreCheck(t *testing.T) {
+       log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+       collector := NewRedisCollector(log)
+
+       // Test nil metrics
+       err := collector.PreCheck(nil)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "redis configuration is required")
+
+       // Test missing Redis configuration
+       metrics := &jobtypes.Metrics{
+               Name: "redis_test",
+       }
+       err = collector.PreCheck(metrics)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "redis configuration is required")
+
+       // Test missing host
+       metrics.Redis = &jobtypes.RedisProtocol{
+               Port: "6379",
+       }
+       err = collector.PreCheck(metrics)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "redis host is required")
+
+       // Test missing port
+       metrics.Redis = &jobtypes.RedisProtocol{
+               Host: "localhost",
+       }
+       err = collector.PreCheck(metrics)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "redis port is required")
+
+       // Test valid configuration
+       metrics.Redis = &jobtypes.RedisProtocol{
+               Host: "localhost",
+               Port: "6379",
+       }
+       err = collector.PreCheck(metrics)
+       assert.NoError(t, err)
+}
+
+func TestRedisCollector_Collect(t *testing.T) {
+       // Start miniredis
+       s, err := miniredis.Run()
+       if err != nil {
+               t.Fatalf("Could not start miniredis: %s", err)
+       }
+       defer s.Close()
+
+       log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+       collector := NewRedisCollector(log)
+
+       // Parse host and port from miniredis address
+       addrParts := strings.Split(s.Addr(), ":")
+       host := addrParts[0]
+       port := addrParts[1]
+
+       metrics := &jobtypes.Metrics{
+               Name: "redis_test",
+               Redis: &jobtypes.RedisProtocol{
+                       Host: host,
+                       Port: port,
+               },
+               AliasFields: []string{"responseTime"},
+       }
+
+       // Execute Collect
+       result := collector.Collect(metrics)
+
+       // Verify result
+       assert.Equal(t, consts.CollectSuccess, result.Code)
+       assert.Equal(t, "success", result.Msg)
+       assert.NotEmpty(t, result.Values)
+       assert.Equal(t, 1, len(result.Values))
+
+       // Verify collected values
+       values := result.Values[0].Columns
+       assert.Equal(t, 1, len(values))
+
+       if values[0] == NullValue {
+               t.Logf("Failed to get responseTime. Value is NullValue.")
+       }
+
+       // responseTime should be present
+       assert.NotEqual(t, NullValue, values[0])
+}
+
+func TestRedisCollector_Collect_Auth(t *testing.T) {
+       // Start miniredis with auth
+       s, err := miniredis.Run()
+       if err != nil {
+               t.Fatalf("Could not start miniredis: %s", err)
+       }
+       defer s.Close()
+       s.RequireAuth("password123")
+
+       log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+       collector := NewRedisCollector(log)
+
+       addrParts := strings.Split(s.Addr(), ":")
+       host := addrParts[0]
+       port := addrParts[1]
+
+       // Test with correct password
+       metrics := &jobtypes.Metrics{
+               Name: "redis_auth_test",
+               Redis: &jobtypes.RedisProtocol{
+                       Host:     host,
+                       Port:     port,
+                       Password: "password123",
+               },
+               AliasFields: []string{"redis_version"},
+       }
+
+       result := collector.Collect(metrics)
+       assert.Equal(t, consts.CollectSuccess, result.Code)
+
+       // Test with incorrect password
+       metrics.Redis.Password = "wrong_password"
+       result = collector.Collect(metrics)
+       assert.Equal(t, consts.CollectFail, result.Code)
+}
+
+func TestRedisCollector_ParseInfo(t *testing.T) {
+       log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug)
+       collector := NewRedisCollector(log)
+
+       info := `# Server
+redis_version:6.2.6
+redis_git_sha1:00000000
+redis_git_dirty:0
+redis_build_id:c6f3693d1ac7923d
+redis_mode:standalone
+os:Linux 5.10.76-linuxkit x86_64
+arch_bits:64
+multiplexing_api:epoll
+atomicvar_api:atomic-builtin
+gcc_version:10.2.1
+process_id:1
+process_supervised:no
+run_id:d55354296781d79907a72527854463a2636629d6
+tcp_port:6379
+server_time_usec:1645518888888888
+uptime_in_seconds:3600
+uptime_in_days:1
+
+# Clients
+connected_clients:1
+cluster_enabled:0
+maxmemory:0`
+
+       parsed := collector.parseInfo(info)
+
+       assert.Equal(t, "6.2.6", parsed["redis_version"])
+       assert.Equal(t, "standalone", parsed["redis_mode"])
+       assert.Equal(t, "1", parsed["connected_clients"])
+       assert.Equal(t, "0", parsed["cluster_enabled"])
+       assert.Equal(t, "1", parsed["uptime_in_days"])
+}
diff --git a/internal/collector/common/transport/transport.go 
b/internal/collector/common/transport/transport.go
index 44058e3..f2edf73 100644
--- a/internal/collector/common/transport/transport.go
+++ b/internal/collector/common/transport/transport.go
@@ -18,260 +18,260 @@
 package transport
 
 import (
-       "context"
-       "encoding/json"
-       "fmt"
-       "os"
-
-       pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
-       clrserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
-       configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
-       config 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
-       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+  "context"
+  "encoding/json"
+  "fmt"
+  "os"
+
+  pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+  clrserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
+  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
+  configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
+  loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+  "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
+  "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+  "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
 const (
-       // DefaultManagerAddr is the default manager server address (Java Netty 
default port)
-       DefaultManagerAddr = "127.0.0.1:1158"
-       // DefaultProtocol is the default communication protocol for Java 
compatibility
-       DefaultProtocol = "netty"
-       // DefaultMode is the default operation mode
-       DefaultMode = "public"
-       // DefaultIdentity is the default collector identity
-       DefaultIdentity = "collector-go"
+  // DefaultManagerAddr is the default manager server address (Java Netty 
default port)
+  DefaultManagerAddr = "127.0.0.1:1158"
+  // DefaultProtocol is the default communication protocol for Java 
compatibility
+  DefaultProtocol = "netty"
+  // DefaultMode is the default operation mode
+  DefaultMode = "public"
+  // DefaultIdentity is the default collector identity
+  DefaultIdentity = "collector-go"
 )
 
 type Runner struct {
-       Config       *configtypes.CollectorConfig
-       client       transport.TransportClient
-       jobScheduler transport.JobScheduler
-       clrserver.Server
+  Config       *configtypes.CollectorConfig
+  client       transport.TransportClient
+  jobScheduler transport.JobScheduler
+  clrserver.Server
 }
 
 func New(cfg *configtypes.CollectorConfig) *Runner {
-       return &Runner{
-               Config: cfg,
-               Server: clrserver.Server{
-                       Logger: logger.Logger{}, // Will be initialized 
properly in Start method
-               },
-       }
+  return &Runner{
+    Config: cfg,
+    Server: clrserver.Server{
+      Logger: logger.Logger{}, // Will be initialized properly in Start method
+    },
+  }
 }
 
 // SetJobScheduler sets the job scheduler for the transport runner
 func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) {
-       r.jobScheduler = scheduler
+  r.jobScheduler = scheduler
 }
 
 // NewFromConfig creates a new transport runner from collector configuration
 func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
-       if cfg == nil {
-               return nil
-       }
-       return New(cfg)
+  if cfg == nil {
+    return nil
+  }
+  return New(cfg)
 }
 
 // NewFromEnv creates a new transport runner from environment variables
 func NewFromEnv() *Runner {
-       envLoader := config.NewEnvConfigLoader()
-       cfg := envLoader.LoadFromEnv()
-       return NewFromConfig(cfg)
+  envLoader := config.NewEnvConfigLoader()
+  cfg := envLoader.LoadFromEnv()
+  return NewFromConfig(cfg)
 }
 
 // NewFromUnifiedConfig creates a new transport runner using unified 
configuration loading
 // It loads from file first, then overrides with environment variables
 func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
-       unifiedLoader := config.NewUnifiedConfigLoader(cfgPath)
-       cfg, err := unifiedLoader.Load()
-       if err != nil {
-               return nil, err
-       }
-       return NewFromConfig(cfg), nil
+  unifiedLoader := config.NewUnifiedConfigLoader(cfgPath)
+  cfg, err := unifiedLoader.Load()
+  if err != nil {
+    return nil, err
+  }
+  return NewFromConfig(cfg), nil
 }
 
 func (r *Runner) Start(ctx context.Context) error {
-       // 初始化 Logger 如果它还没有被设置
-       if r.Logger.IsZero() {
-               r.Logger = logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo)
-       }
-       r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
-       r.Logger.Info("Starting transport client")
-
-       // 构建 server 地址
-       addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, 
r.Config.Collector.Manager.Port)
-       if addr == ":" {
-               // 如果配置为空,使用环境变量或默认值
-               if v := os.Getenv("MANAGER_ADDR"); v != "" {
-                       addr = v
-               } else {
-                       addr = DefaultManagerAddr
-               }
-       }
-
-       // 确定协议
-       protocol := r.Config.Collector.Manager.Protocol
-       if protocol == "" {
-               if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
-                       protocol = v
-               } else {
-                       protocol = DefaultProtocol
-               }
-       }
-
-       r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", 
protocol)
-
-       // 创建客户端
-       factory := &transport.TransportClientFactory{}
-       client, err := factory.CreateClient(protocol, addr)
-       if err != nil {
-               r.Logger.Error(err, "Failed to create transport client")
-               return err
-       }
-
-       // Set the identity on the client if it supports it
-       identity := r.Config.Collector.Identity
-       if identity == "" {
-               identity = DefaultIdentity
-       }
-
-       if nettyClient, ok := client.(*transport.NettyClient); ok {
-               nettyClient.SetIdentity(identity)
-       }
-
-       r.client = client
-
-       // 设置事件处理器
-       switch c := client.(type) {
-       case *transport.GrpcClient:
-               c.SetEventHandler(func(event transport.Event) {
-                       switch event.Type {
-                       case transport.EventConnected:
-                               r.Logger.Info("Connected to manager gRPC 
server", "addr", event.Address)
-                               go r.sendOnlineMessage()
-                       case transport.EventDisconnected:
-                               r.Logger.Info("Disconnected from manager gRPC 
server", "addr", event.Address)
-                       case transport.EventConnectFailed:
-                               r.Logger.Error(event.Error, "Failed to connect 
to manager gRPC server", "addr", event.Address)
-                       }
-               })
-               // Register processors with job scheduler
-               if r.jobScheduler != nil {
-                       transport.RegisterDefaultProcessors(c, r.jobScheduler)
-                       r.Logger.Info("Registered gRPC processors with job 
scheduler")
-               } else {
-                       transport.RegisterDefaultProcessors(c, nil)
-                       r.Logger.Info("Registered gRPC processors without job 
scheduler")
-               }
-       case *transport.NettyClient:
-               c.SetEventHandler(func(event transport.Event) {
-                       switch event.Type {
-                       case transport.EventConnected:
-                               r.Logger.Info("Connected to manager netty 
server", "addr", event.Address)
-                               go r.sendOnlineMessage()
-                       case transport.EventDisconnected:
-                               r.Logger.Info("Disconnected from manager netty 
server", "addr", event.Address)
-                       case transport.EventConnectFailed:
-                               r.Logger.Error(event.Error, "Failed to connect 
to manager netty server", "addr", event.Address)
-                       }
-               })
-               // Register processors with job scheduler
-               if r.jobScheduler != nil {
-                       transport.RegisterDefaultNettyProcessors(c, 
r.jobScheduler)
-                       r.Logger.Info("Registered netty processors with job 
scheduler")
-               } else {
-                       transport.RegisterDefaultNettyProcessors(c, nil)
-                       r.Logger.Info("Registered netty processors without job 
scheduler")
-               }
-       }
-
-       if err := r.client.Start(); err != nil {
-               r.Logger.Error(err, "Failed to start transport client")
-               return err
-       }
-
-       // 创建新的context用于监控关闭信号
-       ctx, cancel := context.WithCancel(ctx)
-       defer cancel()
-
-       // 监听 ctx.Done 优雅关闭
-       go func() {
-               <-ctx.Done()
-               r.Logger.Info("Shutting down transport client...")
-               _ = r.client.Shutdown()
-       }()
-
-       // 阻塞直到 ctx.Done
-       <-ctx.Done()
-       return nil
+  // 初始化 Logger 如果它还没有被设置
+  if r.Logger.IsZero() {
+    r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
+  }
+  r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
+  r.Logger.Info("Starting transport client")
+
+  // 构建 server 地址
+  addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, 
r.Config.Collector.Manager.Port)
+  if addr == ":" {
+    // 如果配置为空,使用环境变量或默认值
+    if v := os.Getenv("MANAGER_ADDR"); v != "" {
+      addr = v
+    } else {
+      addr = DefaultManagerAddr
+    }
+  }
+
+  // 确定协议
+  protocol := r.Config.Collector.Manager.Protocol
+  if protocol == "" {
+    if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
+      protocol = v
+    } else {
+      protocol = DefaultProtocol
+    }
+  }
+
+  r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", 
protocol)
+
+  // 创建客户端
+  factory := &transport.TransportClientFactory{}
+  client, err := factory.CreateClient(protocol, addr)
+  if err != nil {
+    r.Logger.Error(err, "Failed to create transport client")
+    return err
+  }
+
+  // Set the identity on the client if it supports it
+  identity := r.Config.Collector.Identity
+  if identity == "" {
+    identity = DefaultIdentity
+  }
+
+  if nettyClient, ok := client.(*transport.NettyClient); ok {
+    nettyClient.SetIdentity(identity)
+  }
+
+  r.client = client
+
+  // 设置事件处理器
+  switch c := client.(type) {
+  case *transport.GrpcClient:
+    c.SetEventHandler(func(event transport.Event) {
+      switch event.Type {
+      case transport.EventConnected:
+        r.Logger.Info("Connected to manager gRPC server", "addr", 
event.Address)
+        go r.sendOnlineMessage()
+      case transport.EventDisconnected:
+        r.Logger.Info("Disconnected from manager gRPC server", "addr", 
event.Address)
+      case transport.EventConnectFailed:
+        r.Logger.Error(event.Error, "Failed to connect to manager gRPC 
server", "addr", event.Address)
+      }
+    })
+    // Register processors with job scheduler
+    if r.jobScheduler != nil {
+      transport.RegisterDefaultProcessors(c, r.jobScheduler)
+      r.Logger.Info("Registered gRPC processors with job scheduler")
+    } else {
+      transport.RegisterDefaultProcessors(c, nil)
+      r.Logger.Info("Registered gRPC processors without job scheduler")
+    }
+  case *transport.NettyClient:
+    c.SetEventHandler(func(event transport.Event) {
+      switch event.Type {
+      case transport.EventConnected:
+        r.Logger.Info("Connected to manager netty server", "addr", 
event.Address)
+        go r.sendOnlineMessage()
+      case transport.EventDisconnected:
+        r.Logger.Info("Disconnected from manager netty server", "addr", 
event.Address)
+      case transport.EventConnectFailed:
+        r.Logger.Error(event.Error, "Failed to connect to manager netty 
server", "addr", event.Address)
+      }
+    })
+    // Register processors with job scheduler
+    if r.jobScheduler != nil {
+      transport.RegisterDefaultNettyProcessors(c, r.jobScheduler)
+      r.Logger.Info("Registered netty processors with job scheduler")
+    } else {
+      transport.RegisterDefaultNettyProcessors(c, nil)
+      r.Logger.Info("Registered netty processors without job scheduler")
+    }
+  }
+
+  if err := r.client.Start(); err != nil {
+    r.Logger.Error(err, "Failed to start transport client")
+    return err
+  }
+
+  // 创建新的context用于监控关闭信号
+  ctx, cancel := context.WithCancel(ctx)
+  defer cancel()
+
+  // 监听 ctx.Done 优雅关闭
+  go func() {
+    <-ctx.Done()
+    r.Logger.Info("Shutting down transport client...")
+    _ = r.client.Shutdown()
+  }()
+
+  // 阻塞直到 ctx.Done
+  <-ctx.Done()
+  return nil
 }
 
 func (r *Runner) sendOnlineMessage() {
-       if r.client != nil && r.client.IsStarted() {
-               // Use the configured identity
-               identity := r.Config.Collector.Identity
-               if identity == "" {
-                       identity = DefaultIdentity
-               }
-
-               // Create CollectorInfo JSON structure as expected by Java 
server
-               mode := r.Config.Collector.Mode
-               if mode == "" {
-                       mode = DefaultMode // Default mode as in Java version
-               }
-
-               collectorInfo := map[string]interface{}{
-                       "name":    identity,
-                       "ip":      "", // Let server detect IP
-                       "version": "1.0.0",
-                       "mode":    mode,
-               }
-
-               // Convert to JSON bytes
-               jsonData, err := json.Marshal(collectorInfo)
-               if err != nil {
-                       r.Logger.Error(err, "Failed to marshal collector info 
to JSON")
-                       return
-               }
-
-               onlineMsg := &pb.Message{
-                       Type:      pb.MessageType_GO_ONLINE,
-                       Direction: pb.Direction_REQUEST,
-                       Identity:  identity,
-                       Msg:       jsonData,
-               }
-
-               r.Logger.Info("Sending online message", "identity", identity, 
"type", onlineMsg.Type)
-
-               if err := r.client.SendMsg(onlineMsg); err != nil {
-                       r.Logger.Error(err, "Failed to send online message", 
"identity", identity)
-               } else {
-                       r.Logger.Info("Online message sent successfully", 
"identity", identity)
-               }
-       }
+  if r.client != nil && r.client.IsStarted() {
+    // Use the configured identity
+    identity := r.Config.Collector.Identity
+    if identity == "" {
+      identity = DefaultIdentity
+    }
+
+    // Create CollectorInfo JSON structure as expected by Java server
+    mode := r.Config.Collector.Mode
+    if mode == "" {
+      mode = DefaultMode // Default mode as in Java version
+    }
+
+    collectorInfo := map[string]interface{}{
+      "name":    identity,
+      "ip":      "", // Let server detect IP
+      "version": "1.0.0",
+      "mode":    mode,
+    }
+
+    // Convert to JSON bytes
+    jsonData, err := json.Marshal(collectorInfo)
+    if err != nil {
+      r.Logger.Error(err, "Failed to marshal collector info to JSON")
+      return
+    }
+
+    onlineMsg := &pb.Message{
+      Type:      pb.MessageType_GO_ONLINE,
+      Direction: pb.Direction_REQUEST,
+      Identity:  identity,
+      Msg:       jsonData,
+    }
+
+    r.Logger.Info("Sending online message", "identity", identity, "type", 
onlineMsg.Type)
+
+    if err := r.client.SendMsg(onlineMsg); err != nil {
+      r.Logger.Error(err, "Failed to send online message", "identity", 
identity)
+    } else {
+      r.Logger.Info("Online message sent successfully", "identity", identity)
+    }
+  }
 }
 
 func (r *Runner) Info() collector.Info {
-       return collector.Info{
-               Name: "transport",
-       }
+  return collector.Info{
+    Name: "transport",
+  }
 }
 
 func (r *Runner) Close() error {
-       r.Logger.Info("transport close...")
-       if r.client != nil {
-               _ = r.client.Shutdown()
-       }
-       return nil
+  r.Logger.Info("transport close...")
+  if r.client != nil {
+    _ = r.client.Shutdown()
+  }
+  return nil
 }
 
 // GetClient returns the transport client (for testing and advanced usage)
 func (r *Runner) GetClient() transport.TransportClient {
-       return r.client
+  return r.client
 }
 
 // IsConnected returns whether the client is connected and started
 func (r *Runner) IsConnected() bool {
-       return r.client != nil && r.client.IsStarted()
+  return r.client != nil && r.client.IsStarted()
 }
diff --git a/internal/collector/common/types/job/metrics_types.go 
b/internal/collector/common/types/job/metrics_types.go
index 293811a..b783c21 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -259,11 +259,13 @@ type JMXProtocol struct {
 
 // RedisProtocol represents Redis protocol configuration
 type RedisProtocol struct {
-       Host     string `json:"host"`
-       Port     int    `json:"port"`
-       Password string `json:"password"`
-       Pattern  string `json:"pattern"`
-       Timeout  int    `json:"timeout"`
+       Host      string     `json:"host"`
+       Port      string     `json:"port"`
+       Username  string     `json:"username"`
+       Password  string     `json:"password"`
+       Pattern   string     `json:"pattern"`
+       Timeout   string     `json:"timeout"`
+       SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"`
 }
 
 // MongoDBProtocol represents MongoDB protocol configuration
diff --git a/internal/util/param/param_replacer.go 
b/internal/util/param/param_replacer.go
index 5e95eed..0aeec7d 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -148,6 +148,11 @@ func (r *Replacer) ReplaceMetricsParams(metrics 
*jobtypes.Metrics, paramMap map[
                r.replaceHTTPParams(metrics.HTTP, paramMap)
        }
 
+       // 4. Redis
+       if metrics.Redis != nil {
+               r.replaceRedisParams(metrics.Redis, paramMap)
+       }
+
        // Replace parameters in basic metrics fields
        if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
                return fmt.Errorf("failed to replace basic metrics params: %w", 
err)
@@ -204,6 +209,22 @@ func (r *Replacer) replaceHTTPParams(http 
*jobtypes.HTTPProtocol, paramMap map[s
        }
 }
 
+// replaceRedisParams specific replacement logic for RedisProtocol struct
+func (r *Replacer) replaceRedisParams(redis *jobtypes.RedisProtocol, paramMap 
map[string]string) {
+       redis.Host = r.replaceParamPlaceholders(redis.Host, paramMap)
+       redis.Port = r.replaceParamPlaceholders(redis.Port, paramMap)
+       redis.Username = r.replaceParamPlaceholders(redis.Username, paramMap)
+       redis.Password = r.replaceParamPlaceholders(redis.Password, paramMap)
+       redis.Pattern = r.replaceParamPlaceholders(redis.Pattern, paramMap)
+       redis.Timeout = r.replaceParamPlaceholders(redis.Timeout, paramMap)
+       if redis.SSHTunnel != nil {
+               redis.SSHTunnel.Host = 
r.replaceParamPlaceholders(redis.SSHTunnel.Host, paramMap)
+               redis.SSHTunnel.Port = 
r.replaceParamPlaceholders(redis.SSHTunnel.Port, paramMap)
+               redis.SSHTunnel.Username = 
r.replaceParamPlaceholders(redis.SSHTunnel.Username, paramMap)
+               redis.SSHTunnel.Password = 
r.replaceParamPlaceholders(redis.SSHTunnel.Password, paramMap)
+       }
+}
+
 // replaceProtocolParams replaces parameters in any protocol configuration 
defined as interface{}
 func (r *Replacer) replaceProtocolParams(protocolInterface *interface{}, 
paramMap map[string]string) error {
        if *protocolInterface == nil {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to