This is an automated email from the ASF dual-hosted git repository. shown pushed a commit to branch 1130-yuluo/opt in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit c7e6128252fde929ebe301ce61cf109e7523fe6a Author: yuluo-yx <[email protected]> AuthorDate: Sun Nov 30 15:00:18 2025 +0800 feat: add redis collector impl Signed-off-by: yuluo-yx <[email protected]> --- go.mod | 6 +- go.sum | 12 + internal/collector/basic/init.go | 8 +- internal/collector/basic/jmx/.keep | 0 internal/collector/basic/redis/.keep | 0 internal/collector/basic/redis/redis_collector.go | 345 +++++++++++++++++ .../collector/basic/redis/redis_collector_test.go | 200 ++++++++++ internal/collector/common/transport/transport.go | 420 ++++++++++----------- .../collector/common/types/job/metrics_types.go | 12 +- internal/util/param/param_replacer.go | 21 ++ 10 files changed, 805 insertions(+), 219 deletions(-) diff --git a/go.mod b/go.mod index b924b9a..0891533 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module hertzbeat.apache.org/hertzbeat-collector-go go 1.24.6 require ( + github.com/alicebob/miniredis/v2 v2.35.0 github.com/apache/arrow/go/v13 v13.0.0 github.com/expr-lang/expr v1.16.9 github.com/go-logr/logr v1.4.3 @@ -11,6 +12,8 @@ require ( github.com/lib/pq v1.10.9 github.com/microsoft/go-mssqldb v1.9.3 github.com/prometheus/client_golang v1.23.0 + github.com/prometheus/common v0.65.0 + github.com/redis/go-redis/v9 v9.17.1 github.com/sijms/go-ora/v2 v2.9.0 github.com/spf13/cobra v1.10.1 github.com/stretchr/testify v1.10.0 @@ -26,6 +29,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/goccy/go-json v0.10.0 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect @@ -38,10 +42,10 @@ require ( github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.65.0 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/spf13/pflag v1.0.9 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.25.0 // indirect diff --git a/go.sum b/go.sum index b54c994..e28774c 100644 --- a/go.sum +++ b/go.sum @@ -12,15 +12,23 @@ github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1 h1:bFWuo github.com/Azure/azure-sdk-for-go/sdk/security/keyvault/internal v1.1.1/go.mod h1:Vih/3yc6yac2JzU4hzpaDupBJP0Flaia9rXXrU8xyww= github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 h1:oygO0locgZJe7PpYPXT5A29ZkwJaPqcva7BVeemZOZs= github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/alicebob/miniredis/v2 v2.35.0 h1:QwLphYqCEAo1eu1TqPRN2jgVMPBweeQcR21jeqDCONI= +github.com/alicebob/miniredis/v2 v2.35.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM= github.com/apache/arrow/go/v13 v13.0.0 h1:kELrvDQuKZo8csdWYqBQfyi431x6Zs/YJTEgUuSVcWk= github.com/apache/arrow/go/v13 v13.0.0/go.mod h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI= github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= @@ -79,6 +87,8 @@ github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2 github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/redis/go-redis/v9 v9.17.1 h1:7tl732FjYPRT9H9aNfyTwKg9iTETjWjGKEJ2t/5iWTs= +github.com/redis/go-redis/v9 v9.17.1/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -90,6 +100,8 @@ github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go index de47b58..3145b48 100644 --- a/internal/collector/basic/init.go +++ b/internal/collector/basic/init.go @@ -22,6 +22,7 @@ package basic import ( "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" @@ -48,10 +49,11 @@ func init() { return http.NewHTTPCollector(logger) }) + strategy.RegisterFactory("redis", func(logger logger.Logger) strategy.Collector { + return redis.NewRedisCollector(logger) + }) + // More protocols can be added here in the future: - // strategy.RegisterFactory("redis", func(logger logger.Logger) strategy.Collector { - // return redis.NewRedisCollector(logger) - // }) } // InitializeAllCollectors initializes all registered collectors diff --git a/internal/collector/basic/jmx/.keep b/internal/collector/basic/jmx/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/internal/collector/basic/redis/.keep b/internal/collector/basic/redis/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/internal/collector/basic/redis/redis_collector.go b/internal/collector/basic/redis/redis_collector.go new file mode 100644 index 0000000..6341815 --- /dev/null +++ b/internal/collector/basic/redis/redis_collector.go @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package redis + +import ( + "context" + "fmt" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/redis/go-redis/v9" + "golang.org/x/crypto/ssh" + + sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/ssh" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +const ( + ProtocolRedis = "redis" + ResponseTime = "responseTime" + NullValue = consts.NULL_VALUE + ClusterPattern = "3" + ClusterInfo = "cluster" + Identity = "identity" +) + +// RedisCollector implements Redis metrics collection +type RedisCollector struct { + logger logger.Logger +} + +// NewRedisCollector creates a new Redis collector +func NewRedisCollector(logger logger.Logger) *RedisCollector { + return &RedisCollector{ + logger: logger.WithName("redis-collector"), + } +} + +// Protocol returns the protocol this collector supports +func (rc *RedisCollector) Protocol() string { + return ProtocolRedis +} + +// PreCheck validates the Redis metrics configuration +func (rc *RedisCollector) PreCheck(metrics *jobtypes.Metrics) error { + if metrics == nil || metrics.Redis == nil { + return fmt.Errorf("redis configuration is required") + } + if metrics.Redis.Host == "" { + return fmt.Errorf("redis host is required") + } + if metrics.Redis.Port == "" { + return fmt.Errorf("redis port is required") + } + return nil +} + +// Collect performs Redis metrics collection +func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRepMetricsData { + startTime := time.Now() + + // Create response structure + response := &jobtypes.CollectRepMetricsData{ + Metrics: metrics.Name, + Time: startTime.UnixMilli(), + Code: consts.CollectSuccess, + Msg: "success", + Fields: make([]jobtypes.Field, 0), + Values: make([]jobtypes.ValueRow, 0), + } + + // Initialize fields + for _, alias := range metrics.AliasFields { + response.Fields = append(response.Fields, jobtypes.Field{ + Field: alias, + Type: 1, + }) + } + + redisConfig := metrics.Redis + + // Parse timeout + timeout := 30 * time.Second + if redisConfig.Timeout != "" { + if t, err := strconv.Atoi(redisConfig.Timeout); err == nil { + timeout = time.Duration(t) * time.Millisecond + } else if d, err := time.ParseDuration(redisConfig.Timeout); err == nil { + timeout = d + } + } + + // Create dialer (handles SSH tunnel if configured) + dialer, err := rc.createRedisDialer(redisConfig.SSHTunnel) + if err != nil { + rc.logger.Error(err, "failed to create redis dialer") + response.Code = consts.CollectUnConnectable + response.Msg = err.Error() + return response + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // Check if cluster mode + isCluster := redisConfig.Pattern == ClusterPattern || strings.HasPrefix(strings.ToLower(redisConfig.Pattern), ClusterInfo) + + if isCluster { + rc.collectCluster(ctx, metrics, redisConfig, dialer, timeout, response, startTime) + } else { + rc.collectSingle(ctx, metrics, redisConfig, dialer, timeout, response, startTime) + } + + return response +} + +func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { + opts := &redis.Options{ + Addr: fmt.Sprintf("%s:%s", config.Host, config.Port), + Username: config.Username, + Password: config.Password, + DialTimeout: timeout, + ReadTimeout: timeout, + } + if dialer != nil { + opts.Dialer = dialer + } + + client := redis.NewClient(opts) + defer client.Close() + + info, err := rc.fetchRedisInfo(ctx, client, config.Pattern) + if err != nil { + rc.logger.Error(err, "failed to collect redis metrics") + response.Code = consts.CollectFail + response.Msg = err.Error() + return + } + + responseTime := time.Since(startTime).Milliseconds() + parseMap := rc.parseInfo(info) + parseMap[strings.ToLower(ResponseTime)] = fmt.Sprintf("%d", responseTime) + parseMap[strings.ToLower(Identity)] = fmt.Sprintf("%s:%s", config.Host, config.Port) + + rc.addValueRow(response, metrics.AliasFields, parseMap) +} + +func (rc *RedisCollector) collectCluster(ctx context.Context, metrics *jobtypes.Metrics, config *jobtypes.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { + opts := &redis.ClusterOptions{ + Addrs: []string{fmt.Sprintf("%s:%s", config.Host, config.Port)}, + Username: config.Username, + Password: config.Password, + DialTimeout: timeout, + ReadTimeout: timeout, + } + if dialer != nil { + opts.Dialer = dialer + } + + client := redis.NewClusterClient(opts) + defer client.Close() + + // Collect from all masters + var wg sync.WaitGroup + var mu sync.Mutex + + collectNode := func(nodeClient *redis.Client) { + defer wg.Done() + info, err := rc.fetchRedisInfo(ctx, nodeClient, config.Pattern) + if err != nil { + rc.logger.Error(err, "failed to collect redis cluster node metrics", "addr", nodeClient.Options().Addr) + return + } + + responseTime := time.Since(startTime).Milliseconds() + parseMap := rc.parseInfo(info) + parseMap[strings.ToLower(ResponseTime)] = fmt.Sprintf("%d", responseTime) + parseMap[strings.ToLower(Identity)] = nodeClient.Options().Addr + + // If collecting cluster info specifically + if metrics.Name == ClusterInfo { + clusterInfo, err := nodeClient.ClusterInfo(ctx).Result() + if err == nil { + clusterMap := rc.parseInfo(clusterInfo) + for k, v := range clusterMap { + parseMap[k] = v + } + } + } + + mu.Lock() + rc.addValueRow(response, metrics.AliasFields, parseMap) + mu.Unlock() + } + + // Iterate over masters + err := client.ForEachMaster(ctx, func(ctx context.Context, nodeClient *redis.Client) error { + wg.Add(1) + go collectNode(nodeClient) + return nil + }) + if err != nil { + rc.logger.Error(err, "failed to iterate masters") + } + + // Iterate over slaves + err = client.ForEachSlave(ctx, func(ctx context.Context, nodeClient *redis.Client) error { + wg.Add(1) + go collectNode(nodeClient) + return nil + }) + if err != nil { + rc.logger.Error(err, "failed to iterate slaves") + } + + wg.Wait() + + if len(response.Values) == 0 { + response.Code = consts.CollectFail + response.Msg = "no data collected from cluster" + } +} + +func (rc *RedisCollector) fetchRedisInfo(ctx context.Context, client *redis.Client, pattern string) (string, error) { + if pattern != "" && strings.HasPrefix(strings.ToLower(pattern), ClusterInfo) && pattern != ClusterPattern { + return client.ClusterInfo(ctx).Result() + } + + section := pattern + if section == ClusterPattern { + section = "" // Default info for pattern "3" + } + + if section == "" { + return client.Info(ctx).Result() + } + return client.Info(ctx, section).Result() +} + +func (rc *RedisCollector) addValueRow(response *jobtypes.CollectRepMetricsData, aliasFields []string, parseMap map[string]string) { + valueRow := jobtypes.ValueRow{ + Columns: make([]string, len(aliasFields)), + } + + for i, alias := range aliasFields { + if val, ok := parseMap[strings.ToLower(alias)]; ok { + valueRow.Columns[i] = val + } else { + valueRow.Columns[i] = NullValue + } + } + response.Values = append(response.Values, valueRow) +} + +// parseInfo parses Redis INFO command output into a map +func (rc *RedisCollector) parseInfo(info string) map[string]string { + result := make(map[string]string) + lines := strings.Split(info, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + parts := strings.SplitN(line, ":", 2) + if len(parts) == 2 { + key := strings.ToLower(strings.TrimSpace(parts[0])) + value := strings.TrimSpace(parts[1]) + result[key] = value + } + } + return result +} + +// createRedisDialer creates a dialer function that supports SSH tunneling +func (rc *RedisCollector) createRedisDialer(sshTunnel *jobtypes.SSHTunnel) (func(context.Context, string, string) (net.Conn, error), error) { + if sshTunnel == nil || sshTunnel.Enable != "true" { + return nil, nil + } + + // Create SSH config + sshConfig := &jobtypes.SSHProtocol{ + Host: sshTunnel.Host, + Port: sshTunnel.Port, + Username: sshTunnel.Username, + Password: sshTunnel.Password, + } + + // Use common/ssh helper to create client config + clientConfig, err := sshhelper.CreateSSHClientConfig(sshConfig, rc.logger) + if err != nil { + return nil, err + } + clientConfig.Timeout = 30 * time.Second + + return func(ctx context.Context, network, addr string) (net.Conn, error) { + // Connect to SSH server + sshClient, err := sshhelper.DialWithContext(ctx, "tcp", net.JoinHostPort(sshConfig.Host, sshConfig.Port), clientConfig) + if err != nil { + return nil, fmt.Errorf("failed to connect to ssh server: %w", err) + } + + // Dial target via SSH + conn, err := sshClient.Dial(network, addr) + if err != nil { + sshClient.Close() + return nil, fmt.Errorf("failed to dial target via ssh: %w", err) + } + + return &sshConnWrapper{Conn: conn, client: sshClient}, nil + }, nil +} + +type sshConnWrapper struct { + net.Conn + client *ssh.Client +} + +func (w *sshConnWrapper) Close() error { + err := w.Conn.Close() + if w.client != nil { + w.client.Close() + } + return err +} diff --git a/internal/collector/basic/redis/redis_collector_test.go b/internal/collector/basic/redis/redis_collector_test.go new file mode 100644 index 0000000..21c0720 --- /dev/null +++ b/internal/collector/basic/redis/redis_collector_test.go @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package redis + +import ( + "os" + "strings" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/stretchr/testify/assert" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +func TestRedisCollector_Protocol(t *testing.T) { + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug) + collector := NewRedisCollector(log) + assert.Equal(t, ProtocolRedis, collector.Protocol()) +} + +func TestRedisCollector_PreCheck(t *testing.T) { + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug) + collector := NewRedisCollector(log) + + // Test nil metrics + err := collector.PreCheck(nil) + assert.Error(t, err) + assert.Contains(t, err.Error(), "redis configuration is required") + + // Test missing Redis configuration + metrics := &jobtypes.Metrics{ + Name: "redis_test", + } + err = collector.PreCheck(metrics) + assert.Error(t, err) + assert.Contains(t, err.Error(), "redis configuration is required") + + // Test missing host + metrics.Redis = &jobtypes.RedisProtocol{ + Port: "6379", + } + err = collector.PreCheck(metrics) + assert.Error(t, err) + assert.Contains(t, err.Error(), "redis host is required") + + // Test missing port + metrics.Redis = &jobtypes.RedisProtocol{ + Host: "localhost", + } + err = collector.PreCheck(metrics) + assert.Error(t, err) + assert.Contains(t, err.Error(), "redis port is required") + + // Test valid configuration + metrics.Redis = &jobtypes.RedisProtocol{ + Host: "localhost", + Port: "6379", + } + err = collector.PreCheck(metrics) + assert.NoError(t, err) +} + +func TestRedisCollector_Collect(t *testing.T) { + // Start miniredis + s, err := miniredis.Run() + if err != nil { + t.Fatalf("Could not start miniredis: %s", err) + } + defer s.Close() + + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug) + collector := NewRedisCollector(log) + + // Parse host and port from miniredis address + addrParts := strings.Split(s.Addr(), ":") + host := addrParts[0] + port := addrParts[1] + + metrics := &jobtypes.Metrics{ + Name: "redis_test", + Redis: &jobtypes.RedisProtocol{ + Host: host, + Port: port, + }, + AliasFields: []string{"responseTime"}, + } + + // Execute Collect + result := collector.Collect(metrics) + + // Verify result + assert.Equal(t, consts.CollectSuccess, result.Code) + assert.Equal(t, "success", result.Msg) + assert.NotEmpty(t, result.Values) + assert.Equal(t, 1, len(result.Values)) + + // Verify collected values + values := result.Values[0].Columns + assert.Equal(t, 1, len(values)) + + if values[0] == NullValue { + t.Logf("Failed to get responseTime. Value is NullValue.") + } + + // responseTime should be present + assert.NotEqual(t, NullValue, values[0]) +} + +func TestRedisCollector_Collect_Auth(t *testing.T) { + // Start miniredis with auth + s, err := miniredis.Run() + if err != nil { + t.Fatalf("Could not start miniredis: %s", err) + } + defer s.Close() + s.RequireAuth("password123") + + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug) + collector := NewRedisCollector(log) + + addrParts := strings.Split(s.Addr(), ":") + host := addrParts[0] + port := addrParts[1] + + // Test with correct password + metrics := &jobtypes.Metrics{ + Name: "redis_auth_test", + Redis: &jobtypes.RedisProtocol{ + Host: host, + Port: port, + Password: "password123", + }, + AliasFields: []string{"redis_version"}, + } + + result := collector.Collect(metrics) + assert.Equal(t, consts.CollectSuccess, result.Code) + + // Test with incorrect password + metrics.Redis.Password = "wrong_password" + result = collector.Collect(metrics) + assert.Equal(t, consts.CollectFail, result.Code) +} + +func TestRedisCollector_ParseInfo(t *testing.T) { + log := logger.DefaultLogger(os.Stdout, loggertype.LogLevelDebug) + collector := NewRedisCollector(log) + + info := `# Server +redis_version:6.2.6 +redis_git_sha1:00000000 +redis_git_dirty:0 +redis_build_id:c6f3693d1ac7923d +redis_mode:standalone +os:Linux 5.10.76-linuxkit x86_64 +arch_bits:64 +multiplexing_api:epoll +atomicvar_api:atomic-builtin +gcc_version:10.2.1 +process_id:1 +process_supervised:no +run_id:d55354296781d79907a72527854463a2636629d6 +tcp_port:6379 +server_time_usec:1645518888888888 +uptime_in_seconds:3600 +uptime_in_days:1 + +# Clients +connected_clients:1 +cluster_enabled:0 +maxmemory:0` + + parsed := collector.parseInfo(info) + + assert.Equal(t, "6.2.6", parsed["redis_version"]) + assert.Equal(t, "standalone", parsed["redis_mode"]) + assert.Equal(t, "1", parsed["connected_clients"]) + assert.Equal(t, "0", parsed["cluster_enabled"]) + assert.Equal(t, "1", parsed["uptime_in_days"]) +} diff --git a/internal/collector/common/transport/transport.go b/internal/collector/common/transport/transport.go index 44058e3..f2edf73 100644 --- a/internal/collector/common/transport/transport.go +++ b/internal/collector/common/transport/transport.go @@ -18,260 +18,260 @@ package transport import ( - "context" - "encoding/json" - "fmt" - "os" - - pb "hertzbeat.apache.org/hertzbeat-collector-go/api" - clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" - configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" - config "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "context" + "encoding/json" + "fmt" + "os" + + pb "hertzbeat.apache.org/hertzbeat-collector-go/api" + clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" + configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) const ( - // DefaultManagerAddr is the default manager server address (Java Netty default port) - DefaultManagerAddr = "127.0.0.1:1158" - // DefaultProtocol is the default communication protocol for Java compatibility - DefaultProtocol = "netty" - // DefaultMode is the default operation mode - DefaultMode = "public" - // DefaultIdentity is the default collector identity - DefaultIdentity = "collector-go" + // DefaultManagerAddr is the default manager server address (Java Netty default port) + DefaultManagerAddr = "127.0.0.1:1158" + // DefaultProtocol is the default communication protocol for Java compatibility + DefaultProtocol = "netty" + // DefaultMode is the default operation mode + DefaultMode = "public" + // DefaultIdentity is the default collector identity + DefaultIdentity = "collector-go" ) type Runner struct { - Config *configtypes.CollectorConfig - client transport.TransportClient - jobScheduler transport.JobScheduler - clrserver.Server + Config *configtypes.CollectorConfig + client transport.TransportClient + jobScheduler transport.JobScheduler + clrserver.Server } func New(cfg *configtypes.CollectorConfig) *Runner { - return &Runner{ - Config: cfg, - Server: clrserver.Server{ - Logger: logger.Logger{}, // Will be initialized properly in Start method - }, - } + return &Runner{ + Config: cfg, + Server: clrserver.Server{ + Logger: logger.Logger{}, // Will be initialized properly in Start method + }, + } } // SetJobScheduler sets the job scheduler for the transport runner func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) { - r.jobScheduler = scheduler + r.jobScheduler = scheduler } // NewFromConfig creates a new transport runner from collector configuration func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner { - if cfg == nil { - return nil - } - return New(cfg) + if cfg == nil { + return nil + } + return New(cfg) } // NewFromEnv creates a new transport runner from environment variables func NewFromEnv() *Runner { - envLoader := config.NewEnvConfigLoader() - cfg := envLoader.LoadFromEnv() - return NewFromConfig(cfg) + envLoader := config.NewEnvConfigLoader() + cfg := envLoader.LoadFromEnv() + return NewFromConfig(cfg) } // NewFromUnifiedConfig creates a new transport runner using unified configuration loading // It loads from file first, then overrides with environment variables func NewFromUnifiedConfig(cfgPath string) (*Runner, error) { - unifiedLoader := config.NewUnifiedConfigLoader(cfgPath) - cfg, err := unifiedLoader.Load() - if err != nil { - return nil, err - } - return NewFromConfig(cfg), nil + unifiedLoader := config.NewUnifiedConfigLoader(cfgPath) + cfg, err := unifiedLoader.Load() + if err != nil { + return nil, err + } + return NewFromConfig(cfg), nil } func (r *Runner) Start(ctx context.Context) error { - // 初始化 Logger 如果它还没有被设置 - if r.Logger.IsZero() { - r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) - } - r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) - r.Logger.Info("Starting transport client") - - // 构建 server 地址 - addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, r.Config.Collector.Manager.Port) - if addr == ":" { - // 如果配置为空,使用环境变量或默认值 - if v := os.Getenv("MANAGER_ADDR"); v != "" { - addr = v - } else { - addr = DefaultManagerAddr - } - } - - // 确定协议 - protocol := r.Config.Collector.Manager.Protocol - if protocol == "" { - if v := os.Getenv("MANAGER_PROTOCOL"); v != "" { - protocol = v - } else { - protocol = DefaultProtocol - } - } - - r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", protocol) - - // 创建客户端 - factory := &transport.TransportClientFactory{} - client, err := factory.CreateClient(protocol, addr) - if err != nil { - r.Logger.Error(err, "Failed to create transport client") - return err - } - - // Set the identity on the client if it supports it - identity := r.Config.Collector.Identity - if identity == "" { - identity = DefaultIdentity - } - - if nettyClient, ok := client.(*transport.NettyClient); ok { - nettyClient.SetIdentity(identity) - } - - r.client = client - - // 设置事件处理器 - switch c := client.(type) { - case *transport.GrpcClient: - c.SetEventHandler(func(event transport.Event) { - switch event.Type { - case transport.EventConnected: - r.Logger.Info("Connected to manager gRPC server", "addr", event.Address) - go r.sendOnlineMessage() - case transport.EventDisconnected: - r.Logger.Info("Disconnected from manager gRPC server", "addr", event.Address) - case transport.EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager gRPC server", "addr", event.Address) - } - }) - // Register processors with job scheduler - if r.jobScheduler != nil { - transport.RegisterDefaultProcessors(c, r.jobScheduler) - r.Logger.Info("Registered gRPC processors with job scheduler") - } else { - transport.RegisterDefaultProcessors(c, nil) - r.Logger.Info("Registered gRPC processors without job scheduler") - } - case *transport.NettyClient: - c.SetEventHandler(func(event transport.Event) { - switch event.Type { - case transport.EventConnected: - r.Logger.Info("Connected to manager netty server", "addr", event.Address) - go r.sendOnlineMessage() - case transport.EventDisconnected: - r.Logger.Info("Disconnected from manager netty server", "addr", event.Address) - case transport.EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager netty server", "addr", event.Address) - } - }) - // Register processors with job scheduler - if r.jobScheduler != nil { - transport.RegisterDefaultNettyProcessors(c, r.jobScheduler) - r.Logger.Info("Registered netty processors with job scheduler") - } else { - transport.RegisterDefaultNettyProcessors(c, nil) - r.Logger.Info("Registered netty processors without job scheduler") - } - } - - if err := r.client.Start(); err != nil { - r.Logger.Error(err, "Failed to start transport client") - return err - } - - // 创建新的context用于监控关闭信号 - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // 监听 ctx.Done 优雅关闭 - go func() { - <-ctx.Done() - r.Logger.Info("Shutting down transport client...") - _ = r.client.Shutdown() - }() - - // 阻塞直到 ctx.Done - <-ctx.Done() - return nil + // 初始化 Logger 如果它还没有被设置 + if r.Logger.IsZero() { + r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) + } + r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) + r.Logger.Info("Starting transport client") + + // 构建 server 地址 + addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, r.Config.Collector.Manager.Port) + if addr == ":" { + // 如果配置为空,使用环境变量或默认值 + if v := os.Getenv("MANAGER_ADDR"); v != "" { + addr = v + } else { + addr = DefaultManagerAddr + } + } + + // 确定协议 + protocol := r.Config.Collector.Manager.Protocol + if protocol == "" { + if v := os.Getenv("MANAGER_PROTOCOL"); v != "" { + protocol = v + } else { + protocol = DefaultProtocol + } + } + + r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", protocol) + + // 创建客户端 + factory := &transport.TransportClientFactory{} + client, err := factory.CreateClient(protocol, addr) + if err != nil { + r.Logger.Error(err, "Failed to create transport client") + return err + } + + // Set the identity on the client if it supports it + identity := r.Config.Collector.Identity + if identity == "" { + identity = DefaultIdentity + } + + if nettyClient, ok := client.(*transport.NettyClient); ok { + nettyClient.SetIdentity(identity) + } + + r.client = client + + // 设置事件处理器 + switch c := client.(type) { + case *transport.GrpcClient: + c.SetEventHandler(func(event transport.Event) { + switch event.Type { + case transport.EventConnected: + r.Logger.Info("Connected to manager gRPC server", "addr", event.Address) + go r.sendOnlineMessage() + case transport.EventDisconnected: + r.Logger.Info("Disconnected from manager gRPC server", "addr", event.Address) + case transport.EventConnectFailed: + r.Logger.Error(event.Error, "Failed to connect to manager gRPC server", "addr", event.Address) + } + }) + // Register processors with job scheduler + if r.jobScheduler != nil { + transport.RegisterDefaultProcessors(c, r.jobScheduler) + r.Logger.Info("Registered gRPC processors with job scheduler") + } else { + transport.RegisterDefaultProcessors(c, nil) + r.Logger.Info("Registered gRPC processors without job scheduler") + } + case *transport.NettyClient: + c.SetEventHandler(func(event transport.Event) { + switch event.Type { + case transport.EventConnected: + r.Logger.Info("Connected to manager netty server", "addr", event.Address) + go r.sendOnlineMessage() + case transport.EventDisconnected: + r.Logger.Info("Disconnected from manager netty server", "addr", event.Address) + case transport.EventConnectFailed: + r.Logger.Error(event.Error, "Failed to connect to manager netty server", "addr", event.Address) + } + }) + // Register processors with job scheduler + if r.jobScheduler != nil { + transport.RegisterDefaultNettyProcessors(c, r.jobScheduler) + r.Logger.Info("Registered netty processors with job scheduler") + } else { + transport.RegisterDefaultNettyProcessors(c, nil) + r.Logger.Info("Registered netty processors without job scheduler") + } + } + + if err := r.client.Start(); err != nil { + r.Logger.Error(err, "Failed to start transport client") + return err + } + + // 创建新的context用于监控关闭信号 + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // 监听 ctx.Done 优雅关闭 + go func() { + <-ctx.Done() + r.Logger.Info("Shutting down transport client...") + _ = r.client.Shutdown() + }() + + // 阻塞直到 ctx.Done + <-ctx.Done() + return nil } func (r *Runner) sendOnlineMessage() { - if r.client != nil && r.client.IsStarted() { - // Use the configured identity - identity := r.Config.Collector.Identity - if identity == "" { - identity = DefaultIdentity - } - - // Create CollectorInfo JSON structure as expected by Java server - mode := r.Config.Collector.Mode - if mode == "" { - mode = DefaultMode // Default mode as in Java version - } - - collectorInfo := map[string]interface{}{ - "name": identity, - "ip": "", // Let server detect IP - "version": "1.0.0", - "mode": mode, - } - - // Convert to JSON bytes - jsonData, err := json.Marshal(collectorInfo) - if err != nil { - r.Logger.Error(err, "Failed to marshal collector info to JSON") - return - } - - onlineMsg := &pb.Message{ - Type: pb.MessageType_GO_ONLINE, - Direction: pb.Direction_REQUEST, - Identity: identity, - Msg: jsonData, - } - - r.Logger.Info("Sending online message", "identity", identity, "type", onlineMsg.Type) - - if err := r.client.SendMsg(onlineMsg); err != nil { - r.Logger.Error(err, "Failed to send online message", "identity", identity) - } else { - r.Logger.Info("Online message sent successfully", "identity", identity) - } - } + if r.client != nil && r.client.IsStarted() { + // Use the configured identity + identity := r.Config.Collector.Identity + if identity == "" { + identity = DefaultIdentity + } + + // Create CollectorInfo JSON structure as expected by Java server + mode := r.Config.Collector.Mode + if mode == "" { + mode = DefaultMode // Default mode as in Java version + } + + collectorInfo := map[string]interface{}{ + "name": identity, + "ip": "", // Let server detect IP + "version": "1.0.0", + "mode": mode, + } + + // Convert to JSON bytes + jsonData, err := json.Marshal(collectorInfo) + if err != nil { + r.Logger.Error(err, "Failed to marshal collector info to JSON") + return + } + + onlineMsg := &pb.Message{ + Type: pb.MessageType_GO_ONLINE, + Direction: pb.Direction_REQUEST, + Identity: identity, + Msg: jsonData, + } + + r.Logger.Info("Sending online message", "identity", identity, "type", onlineMsg.Type) + + if err := r.client.SendMsg(onlineMsg); err != nil { + r.Logger.Error(err, "Failed to send online message", "identity", identity) + } else { + r.Logger.Info("Online message sent successfully", "identity", identity) + } + } } func (r *Runner) Info() collector.Info { - return collector.Info{ - Name: "transport", - } + return collector.Info{ + Name: "transport", + } } func (r *Runner) Close() error { - r.Logger.Info("transport close...") - if r.client != nil { - _ = r.client.Shutdown() - } - return nil + r.Logger.Info("transport close...") + if r.client != nil { + _ = r.client.Shutdown() + } + return nil } // GetClient returns the transport client (for testing and advanced usage) func (r *Runner) GetClient() transport.TransportClient { - return r.client + return r.client } // IsConnected returns whether the client is connected and started func (r *Runner) IsConnected() bool { - return r.client != nil && r.client.IsStarted() + return r.client != nil && r.client.IsStarted() } diff --git a/internal/collector/common/types/job/metrics_types.go b/internal/collector/common/types/job/metrics_types.go index 293811a..b783c21 100644 --- a/internal/collector/common/types/job/metrics_types.go +++ b/internal/collector/common/types/job/metrics_types.go @@ -259,11 +259,13 @@ type JMXProtocol struct { // RedisProtocol represents Redis protocol configuration type RedisProtocol struct { - Host string `json:"host"` - Port int `json:"port"` - Password string `json:"password"` - Pattern string `json:"pattern"` - Timeout int `json:"timeout"` + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Pattern string `json:"pattern"` + Timeout string `json:"timeout"` + SSHTunnel *SSHTunnel `json:"sshTunnel,omitempty"` } // MongoDBProtocol represents MongoDB protocol configuration diff --git a/internal/util/param/param_replacer.go b/internal/util/param/param_replacer.go index 5e95eed..0aeec7d 100644 --- a/internal/util/param/param_replacer.go +++ b/internal/util/param/param_replacer.go @@ -148,6 +148,11 @@ func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap map[ r.replaceHTTPParams(metrics.HTTP, paramMap) } + // 4. Redis + if metrics.Redis != nil { + r.replaceRedisParams(metrics.Redis, paramMap) + } + // Replace parameters in basic metrics fields if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil { return fmt.Errorf("failed to replace basic metrics params: %w", err) @@ -204,6 +209,22 @@ func (r *Replacer) replaceHTTPParams(http *jobtypes.HTTPProtocol, paramMap map[s } } +// replaceRedisParams specific replacement logic for RedisProtocol struct +func (r *Replacer) replaceRedisParams(redis *jobtypes.RedisProtocol, paramMap map[string]string) { + redis.Host = r.replaceParamPlaceholders(redis.Host, paramMap) + redis.Port = r.replaceParamPlaceholders(redis.Port, paramMap) + redis.Username = r.replaceParamPlaceholders(redis.Username, paramMap) + redis.Password = r.replaceParamPlaceholders(redis.Password, paramMap) + redis.Pattern = r.replaceParamPlaceholders(redis.Pattern, paramMap) + redis.Timeout = r.replaceParamPlaceholders(redis.Timeout, paramMap) + if redis.SSHTunnel != nil { + redis.SSHTunnel.Host = r.replaceParamPlaceholders(redis.SSHTunnel.Host, paramMap) + redis.SSHTunnel.Port = r.replaceParamPlaceholders(redis.SSHTunnel.Port, paramMap) + redis.SSHTunnel.Username = r.replaceParamPlaceholders(redis.SSHTunnel.Username, paramMap) + redis.SSHTunnel.Password = r.replaceParamPlaceholders(redis.SSHTunnel.Password, paramMap) + } +} + // replaceProtocolParams replaces parameters in any protocol configuration defined as interface{} func (r *Replacer) replaceProtocolParams(protocolInterface *interface{}, paramMap map[string]string) error { if *protocolInterface == nil { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
