This is an automated email from the ASF dual-hosted git repository. shown pushed a commit to branch 1211-yuluo/optimize-init1 in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 4fe1e2a121f300d8780a49734a7a3971317fb008 Author: yuluo-yx <[email protected]> AuthorDate: Thu Dec 11 22:32:39 2025 +0800 feat: optimize imports Signed-off-by: yuluo-yx <[email protected]> --- cmd/main.go | 1 + .../collector/basic/database/jdbc_collector.go | 11 +- internal/collector/basic/http/http_collector.go | 1007 ++++++++++---------- internal/collector/basic/init.go | 69 -- internal/collector/basic/redis/redis_collector.go | 9 +- internal/collector/basic/ssh/ssh_collector.go | 7 + internal/collector/basic/standard/imports.go | 30 + .../common/collect/dispatch/metrics_collector.go | 4 - internal/collector/common/job/job_server.go | 6 +- 9 files changed, 566 insertions(+), 578 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index fb45f5e..2b497b4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,6 +22,7 @@ import ( "os" "hertzbeat.apache.org/hertzbeat-collector-go/cmd/root" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/standard" ) func main() { diff --git a/internal/collector/basic/database/jdbc_collector.go b/internal/collector/basic/database/jdbc_collector.go index 841a0f7..ec8f179 100644 --- a/internal/collector/basic/database/jdbc_collector.go +++ b/internal/collector/basic/database/jdbc_collector.go @@ -33,12 +33,13 @@ import ( "time" _ "github.com/go-sql-driver/mysql" + "github.com/lib/pq" _ "github.com/lib/pq" - pq "github.com/lib/pq" _ "github.com/microsoft/go-mssqldb" - _ "github.com/sijms/go-ora/v2" // Oracle driver + _ "github.com/sijms/go-ora/v2" "golang.org/x/crypto/ssh" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" @@ -46,6 +47,12 @@ import ( "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" ) +func init() { + strategy.RegisterFactory("jdbc", func(logger logger.Logger) strategy.Collector { + return NewJDBCCollector(logger) + }) +} + const ( ProtocolJDBC = "jdbc" diff --git a/internal/collector/basic/http/http_collector.go b/internal/collector/basic/http/http_collector.go index 846b6b6..32632a5 100644 --- a/internal/collector/basic/http/http_collector.go +++ b/internal/collector/basic/http/http_collector.go @@ -20,570 +20,577 @@ package http import ( - "bytes" - "crypto/md5" - "crypto/rand" - "crypto/tls" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "net/http" - "strconv" - "strings" - "time" - - "github.com/prometheus/common/expfmt" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "bytes" + "crypto/md5" + "crypto/rand" + "crypto/tls" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/prometheus/common/expfmt" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) +func init() { + strategy.RegisterFactory(ProtocolHTTP, func(logger logger.Logger) strategy.Collector { + return NewHTTPCollector(logger) + }) +} + const ( - ProtocolHTTP = "http" - - // Parse Types - ParseTypeDefault = "default" - ParseTypeJsonPath = "jsonPath" - ParseTypePrometheus = "prometheus" - ParseTypeWebsite = "website" - ParseTypeHeader = "header" - - // Auth Types - AuthTypeBasic = "Basic Auth" - AuthTypeBearer = "Bearer Token" - AuthTypeDigest = "Digest Auth" + ProtocolHTTP = "http" + + // Parse Types + ParseTypeDefault = "default" + ParseTypeJsonPath = "jsonPath" + ParseTypePrometheus = "prometheus" + ParseTypeWebsite = "website" + ParseTypeHeader = "header" + + // Auth Types + AuthTypeBasic = "Basic Auth" + AuthTypeBearer = "Bearer Token" + AuthTypeDigest = "Digest Auth" ) type HTTPCollector struct { - logger logger.Logger + logger logger.Logger } func NewHTTPCollector(log logger.Logger) *HTTPCollector { - return &HTTPCollector{ - logger: log.WithName("http-collector"), - } + return &HTTPCollector{ + logger: log.WithName("http-collector"), + } } func (hc *HTTPCollector) Protocol() string { - return ProtocolHTTP + return ProtocolHTTP } func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error { - if metrics == nil || metrics.HTTP == nil { - return fmt.Errorf("http configuration is missing") - } - return nil + if metrics == nil || metrics.HTTP == nil { + return fmt.Errorf("http configuration is missing") + } + return nil } func (hc *HTTPCollector) Collect(metrics *job.Metrics) *job.CollectRepMetricsData { - start := time.Now() - - // MetricsCollector has already performed parameter replacement on metrics.HTTP - httpConfig := metrics.HTTP - - // 1. Prepare URL - targetURL := httpConfig.URL - - // Parse SSL bool string - isSSL := false - if httpConfig.SSL != "" { - if val, err := strconv.ParseBool(httpConfig.SSL); err == nil { - isSSL = val - } - } - - if targetURL == "" { - schema := "http" - if isSSL { - schema = "https" - } - // Use metrics.ConfigMap values if URL is empty - // Note: metrics.ConfigMap is already populated with job.Configmap values - host := metrics.ConfigMap["host"] - port := metrics.ConfigMap["port"] - if host != "" && port != "" { - targetURL = fmt.Sprintf("%s://%s:%s", schema, host, port) - } - } - - if targetURL != "" && !strings.HasPrefix(targetURL, "http") { - schema := "http" - if isSSL { - schema = "https" - } - targetURL = fmt.Sprintf("%s://%s", schema, targetURL) - } - - if targetURL == "" { - return hc.createFailResponse(metrics, constants.CollectFail, "target URL is empty") - } - - // 2. Create Request - req, err := hc.createRequest(httpConfig, targetURL) - if err != nil { - return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to create request: %v", err)) - } - - // 3. Create Client - timeoutStr := metrics.Timeout - if httpConfig.Timeout != "" { - timeoutStr = httpConfig.Timeout - } - - timeout := hc.getTimeout(timeoutStr) - client := &http.Client{ - Timeout: timeout, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, - } - - // 4. Execute Request (with Digest retry logic) - resp, err := client.Do(req) - - // Handle Digest Auth Challenge (401) - if err == nil && resp.StatusCode == http.StatusUnauthorized && - httpConfig.Authorization != nil && - strings.EqualFold(httpConfig.Authorization.Type, AuthTypeDigest) { - - // Close the first response body - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - - authHeader := resp.Header.Get("WWW-Authenticate") - if authHeader != "" { - // Create new request with Authorization header - digestReq, digestErr := hc.handleDigestAuth(req, authHeader, httpConfig.Authorization) - if digestErr == nil { - // Retry request - resp, err = client.Do(digestReq) - } else { - hc.logger.Error(digestErr, "failed to handle digest auth") - } - } - } - - if err != nil { - return hc.createFailResponse(metrics, constants.CollectUnReachable, fmt.Sprintf("request failed: %v", err)) - } - defer resp.Body.Close() - - responseTime := time.Since(start).Milliseconds() - - // 5. Parse Response - parseType := httpConfig.ParseType - if parseType == "" { - parseType = ParseTypeDefault - } - - var responseData *job.CollectRepMetricsData - - // Read body - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to read body: %v", err)) - } - - switch parseType { - case ParseTypePrometheus: - responseData, err = hc.parsePrometheus(bodyBytes, metrics) - case ParseTypeWebsite: - responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode, responseTime, metrics, httpConfig) - case ParseTypeHeader: - responseData, err = hc.parseHeader(resp.Header, metrics) - case ParseTypeJsonPath, ParseTypeDefault: - responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) - default: - responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) - } - - if err != nil { - hc.logger.Error(err, "parse response failed", "type", parseType) - return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("parse error: %v", err)) - } - - hc.fillCommonFields(responseData, metrics.AliasFields, responseTime, resp.StatusCode) - - return responseData + start := time.Now() + + // MetricsCollector has already performed parameter replacement on metrics.HTTP + httpConfig := metrics.HTTP + + // 1. Prepare URL + targetURL := httpConfig.URL + + // Parse SSL bool string + isSSL := false + if httpConfig.SSL != "" { + if val, err := strconv.ParseBool(httpConfig.SSL); err == nil { + isSSL = val + } + } + + if targetURL == "" { + schema := "http" + if isSSL { + schema = "https" + } + // Use metrics.ConfigMap values if URL is empty + // Note: metrics.ConfigMap is already populated with job.Configmap values + host := metrics.ConfigMap["host"] + port := metrics.ConfigMap["port"] + if host != "" && port != "" { + targetURL = fmt.Sprintf("%s://%s:%s", schema, host, port) + } + } + + if targetURL != "" && !strings.HasPrefix(targetURL, "http") { + schema := "http" + if isSSL { + schema = "https" + } + targetURL = fmt.Sprintf("%s://%s", schema, targetURL) + } + + if targetURL == "" { + return hc.createFailResponse(metrics, constants.CollectFail, "target URL is empty") + } + + // 2. Create Request + req, err := hc.createRequest(httpConfig, targetURL) + if err != nil { + return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to create request: %v", err)) + } + + // 3. Create Client + timeoutStr := metrics.Timeout + if httpConfig.Timeout != "" { + timeoutStr = httpConfig.Timeout + } + + timeout := hc.getTimeout(timeoutStr) + client := &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + + // 4. Execute Request (with Digest retry logic) + resp, err := client.Do(req) + + // Handle Digest Auth Challenge (401) + if err == nil && resp.StatusCode == http.StatusUnauthorized && + httpConfig.Authorization != nil && + strings.EqualFold(httpConfig.Authorization.Type, AuthTypeDigest) { + + // Close the first response body + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + authHeader := resp.Header.Get("WWW-Authenticate") + if authHeader != "" { + // Create new request with Authorization header + digestReq, digestErr := hc.handleDigestAuth(req, authHeader, httpConfig.Authorization) + if digestErr == nil { + // Retry request + resp, err = client.Do(digestReq) + } else { + hc.logger.Error(digestErr, "failed to handle digest auth") + } + } + } + + if err != nil { + return hc.createFailResponse(metrics, constants.CollectUnReachable, fmt.Sprintf("request failed: %v", err)) + } + defer resp.Body.Close() + + responseTime := time.Since(start).Milliseconds() + + // 5. Parse Response + parseType := httpConfig.ParseType + if parseType == "" { + parseType = ParseTypeDefault + } + + var responseData *job.CollectRepMetricsData + + // Read body + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to read body: %v", err)) + } + + switch parseType { + case ParseTypePrometheus: + responseData, err = hc.parsePrometheus(bodyBytes, metrics) + case ParseTypeWebsite: + responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode, responseTime, metrics, httpConfig) + case ParseTypeHeader: + responseData, err = hc.parseHeader(resp.Header, metrics) + case ParseTypeJsonPath, ParseTypeDefault: + responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) + default: + responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) + } + + if err != nil { + hc.logger.Error(err, "parse response failed", "type", parseType) + return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("parse error: %v", err)) + } + + hc.fillCommonFields(responseData, metrics.AliasFields, responseTime, resp.StatusCode) + + return responseData } // handleDigestAuth generates a new request with the Digest Authorization header func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, authHeader string, authConfig *protocol.Authorization) (*http.Request, error) { - params := hc.parseAuthHeader(authHeader) - realm := params["realm"] - nonce := params["nonce"] - qop := params["qop"] - opaque := params["opaque"] - algorithm := params["algorithm"] - if algorithm == "" { - algorithm = "MD5" - } - - if realm == "" || nonce == "" { - return nil, fmt.Errorf("missing realm or nonce in digest header") - } - - username := authConfig.DigestAuthUsername - password := authConfig.DigestAuthPassword - ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password)) - - method := originalReq.Method - uri := originalReq.URL.RequestURI() - ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri)) - - nc := "00000001" - cnonce := hc.generateCnonce() - - var responseStr string - if qop == "" { - responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce, ha2)) - } else if strings.Contains(qop, "auth") { - responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc, cnonce, "auth", ha2)) - } else { - return nil, fmt.Errorf("unsupported qop: %s", qop) - } - - authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s", algorithm="%s"`, - username, realm, nonce, uri, responseStr, algorithm) - - if opaque != "" { - authVal += fmt.Sprintf(`, opaque="%s"`, opaque) - } - if qop != "" { - authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc, cnonce) - } - - var newReq *http.Request - if originalReq.GetBody != nil { - body, _ := originalReq.GetBody() - newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), body) - } else { - newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), nil) - } - - for k, v := range originalReq.Header { - newReq.Header[k] = v - } - - newReq.Header.Set("Authorization", authVal) - return newReq, nil + params := hc.parseAuthHeader(authHeader) + realm := params["realm"] + nonce := params["nonce"] + qop := params["qop"] + opaque := params["opaque"] + algorithm := params["algorithm"] + if algorithm == "" { + algorithm = "MD5" + } + + if realm == "" || nonce == "" { + return nil, fmt.Errorf("missing realm or nonce in digest header") + } + + username := authConfig.DigestAuthUsername + password := authConfig.DigestAuthPassword + ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password)) + + method := originalReq.Method + uri := originalReq.URL.RequestURI() + ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri)) + + nc := "00000001" + cnonce := hc.generateCnonce() + + var responseStr string + if qop == "" { + responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce, ha2)) + } else if strings.Contains(qop, "auth") { + responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc, cnonce, "auth", ha2)) + } else { + return nil, fmt.Errorf("unsupported qop: %s", qop) + } + + authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s", algorithm="%s"`, + username, realm, nonce, uri, responseStr, algorithm) + + if opaque != "" { + authVal += fmt.Sprintf(`, opaque="%s"`, opaque) + } + if qop != "" { + authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc, cnonce) + } + + var newReq *http.Request + if originalReq.GetBody != nil { + body, _ := originalReq.GetBody() + newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), body) + } else { + newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), nil) + } + + for k, v := range originalReq.Header { + newReq.Header[k] = v + } + + newReq.Header.Set("Authorization", authVal) + return newReq, nil } func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string { - header = strings.TrimPrefix(header, "Digest ") - result := make(map[string]string) - - pairs := strings.Split(header, ",") - for _, pair := range pairs { - parts := strings.SplitN(strings.TrimSpace(pair), "=", 2) - if len(parts) == 2 { - key := strings.TrimSpace(parts[0]) - val := strings.TrimSpace(parts[1]) - val = strings.Trim(val, "\"") - result[key] = val - } - } - return result + header = strings.TrimPrefix(header, "Digest ") + result := make(map[string]string) + + pairs := strings.Split(header, ",") + for _, pair := range pairs { + parts := strings.SplitN(strings.TrimSpace(pair), "=", 2) + if len(parts) == 2 { + key := strings.TrimSpace(parts[0]) + val := strings.TrimSpace(parts[1]) + val = strings.Trim(val, "\"") + result[key] = val + } + } + return result } func (hc *HTTPCollector) md5Hex(s string) string { - hash := md5.Sum([]byte(s)) - return hex.EncodeToString(hash[:]) + hash := md5.Sum([]byte(s)) + return hex.EncodeToString(hash[:]) } func (hc *HTTPCollector) generateCnonce() string { - b := make([]byte, 8) - io.ReadFull(rand.Reader, b) - return hex.EncodeToString(b) + b := make([]byte, 8) + io.ReadFull(rand.Reader, b) + return hex.EncodeToString(b) } func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol, targetURL string) (*http.Request, error) { - method := strings.ToUpper(config.Method) - if method == "" { - method = "GET" - } - - var body io.Reader - if config.Body != "" { - body = strings.NewReader(config.Body) - } - - req, err := http.NewRequest(method, targetURL, body) - if err != nil { - return nil, err - } - - for k, v := range config.Headers { - req.Header.Set(k, v) - } - - if config.Authorization != nil { - auth := config.Authorization - if auth.Type == "" || strings.EqualFold(auth.Type, AuthTypeBasic) { - if auth.BasicAuthUsername != "" && auth.BasicAuthPassword != "" { - req.SetBasicAuth(auth.BasicAuthUsername, auth.BasicAuthPassword) - } - } else if strings.EqualFold(auth.Type, AuthTypeBearer) { - if auth.BearerTokenToken != "" { - req.Header.Set("Authorization", "Bearer "+auth.BearerTokenToken) - } - } - } - - if len(config.Params) > 0 { - q := req.URL.Query() - for k, v := range config.Params { - q.Add(k, v) - } - req.URL.RawQuery = q.Encode() - } - - return req, nil + method := strings.ToUpper(config.Method) + if method == "" { + method = "GET" + } + + var body io.Reader + if config.Body != "" { + body = strings.NewReader(config.Body) + } + + req, err := http.NewRequest(method, targetURL, body) + if err != nil { + return nil, err + } + + for k, v := range config.Headers { + req.Header.Set(k, v) + } + + if config.Authorization != nil { + auth := config.Authorization + if auth.Type == "" || strings.EqualFold(auth.Type, AuthTypeBasic) { + if auth.BasicAuthUsername != "" && auth.BasicAuthPassword != "" { + req.SetBasicAuth(auth.BasicAuthUsername, auth.BasicAuthPassword) + } + } else if strings.EqualFold(auth.Type, AuthTypeBearer) { + if auth.BearerTokenToken != "" { + req.Header.Set("Authorization", "Bearer "+auth.BearerTokenToken) + } + } + } + + if len(config.Params) > 0 { + q := req.URL.Query() + for k, v := range config.Params { + q.Add(k, v) + } + req.URL.RawQuery = q.Encode() + } + + return req, nil } // --- Parsers --- func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - var parser expfmt.TextParser - metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(body)) - if err != nil { - return nil, err - } - now := time.Now().UnixMilli() - for _, mf := range metricFamilies { - for _, m := range mf.Metric { - row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} - labels := make(map[string]string) - for _, pair := range m.Label { - labels[pair.GetName()] = pair.GetValue() - } - hasValue := false - for i, field := range metrics.AliasFields { - if field == constants.NULL_VALUE { - continue - } - if field == "value" || field == "prom_value" { - if m.Gauge != nil { - row.Columns[i] = fmt.Sprintf("%f", m.Gauge.GetValue()) - } else if m.Counter != nil { - row.Columns[i] = fmt.Sprintf("%f", m.Counter.GetValue()) - } else if m.Untyped != nil { - row.Columns[i] = fmt.Sprintf("%f", m.Untyped.GetValue()) - } - hasValue = true - } else { - if val, ok := labels[field]; ok { - row.Columns[i] = val - hasValue = true - } else { - row.Columns[i] = constants.NULL_VALUE - } - } - } - if hasValue { - response.Values = append(response.Values, row) - } - } - } - response.Time = now - return response, nil + response := hc.createSuccessResponse(metrics) + var parser expfmt.TextParser + metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(body)) + if err != nil { + return nil, err + } + now := time.Now().UnixMilli() + for _, mf := range metricFamilies { + for _, m := range mf.Metric { + row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} + labels := make(map[string]string) + for _, pair := range m.Label { + labels[pair.GetName()] = pair.GetValue() + } + hasValue := false + for i, field := range metrics.AliasFields { + if field == constants.NULL_VALUE { + continue + } + if field == "value" || field == "prom_value" { + if m.Gauge != nil { + row.Columns[i] = fmt.Sprintf("%f", m.Gauge.GetValue()) + } else if m.Counter != nil { + row.Columns[i] = fmt.Sprintf("%f", m.Counter.GetValue()) + } else if m.Untyped != nil { + row.Columns[i] = fmt.Sprintf("%f", m.Untyped.GetValue()) + } + hasValue = true + } else { + if val, ok := labels[field]; ok { + row.Columns[i] = val + hasValue = true + } else { + row.Columns[i] = constants.NULL_VALUE + } + } + } + if hasValue { + response.Values = append(response.Values, row) + } + } + } + response.Time = now + return response, nil } func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} - keyword := httpConfig.Keyword - keywordCount := 0 - if keyword != "" { - keywordCount = strings.Count(string(body), keyword) - } - for i, field := range metrics.AliasFields { - switch strings.ToLower(field) { - case strings.ToLower(constants.StatusCode): - row.Columns[i] = strconv.Itoa(statusCode) - case strings.ToLower(constants.RESPONSE_TIME): - row.Columns[i] = strconv.FormatInt(responseTime, 10) - case "keyword": - row.Columns[i] = strconv.Itoa(keywordCount) - default: - row.Columns[i] = constants.NULL_VALUE - } - } - response.Values = append(response.Values, row) - return response, nil + response := hc.createSuccessResponse(metrics) + row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} + keyword := httpConfig.Keyword + keywordCount := 0 + if keyword != "" { + keywordCount = strings.Count(string(body), keyword) + } + for i, field := range metrics.AliasFields { + switch strings.ToLower(field) { + case strings.ToLower(constants.StatusCode): + row.Columns[i] = strconv.Itoa(statusCode) + case strings.ToLower(constants.RESPONSE_TIME): + row.Columns[i] = strconv.FormatInt(responseTime, 10) + case "keyword": + row.Columns[i] = strconv.Itoa(keywordCount) + default: + row.Columns[i] = constants.NULL_VALUE + } + } + response.Values = append(response.Values, row) + return response, nil } func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} - for i, field := range metrics.AliasFields { - val := header.Get(field) - if val != "" { - row.Columns[i] = val - } else { - row.Columns[i] = constants.NULL_VALUE - } - } - response.Values = append(response.Values, row) - return response, nil + response := hc.createSuccessResponse(metrics) + row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} + for i, field := range metrics.AliasFields { + val := header.Get(field) + if val != "" { + row.Columns[i] = val + } else { + row.Columns[i] = constants.NULL_VALUE + } + } + response.Values = append(response.Values, row) + return response, nil } func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, responseTime int64, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - var data interface{} - decoder := json.NewDecoder(bytes.NewReader(body)) - decoder.UseNumber() - if err := decoder.Decode(&data); err != nil { - return response, nil - } - parseScript := httpConfig.ParseScript - root := data - if parseScript != "" && parseScript != "$" { - root = hc.navigateJson(data, parseScript) - } - if root == nil { - return response, nil - } - if arr, ok := root.([]interface{}); ok { - for _, item := range arr { - row := hc.extractRow(item, metrics.AliasFields) - response.Values = append(response.Values, row) - } - } else { - row := hc.extractRow(root, metrics.AliasFields) - response.Values = append(response.Values, row) - } - return response, nil + response := hc.createSuccessResponse(metrics) + var data interface{} + decoder := json.NewDecoder(bytes.NewReader(body)) + decoder.UseNumber() + if err := decoder.Decode(&data); err != nil { + return response, nil + } + parseScript := httpConfig.ParseScript + root := data + if parseScript != "" && parseScript != "$" { + root = hc.navigateJson(data, parseScript) + } + if root == nil { + return response, nil + } + if arr, ok := root.([]interface{}); ok { + for _, item := range arr { + row := hc.extractRow(item, metrics.AliasFields) + response.Values = append(response.Values, row) + } + } else { + row := hc.extractRow(root, metrics.AliasFields) + response.Values = append(response.Values, row) + } + return response, nil } func (hc *HTTPCollector) navigateJson(data interface{}, path string) interface{} { - path = strings.TrimPrefix(path, "$.") - if path == "" { - return data - } - parts := strings.Split(path, ".") - current := data - for _, part := range parts { - if current == nil { - return nil - } - key := part - idx := -1 - if i := strings.Index(part, "["); i > -1 && strings.HasSuffix(part, "]") { - key = part[:i] - idxStr := part[i+1 : len(part)-1] - if val, err := strconv.Atoi(idxStr); err == nil { - idx = val - } - } - if key != "" { - if m, ok := current.(map[string]interface{}); ok { - if val, exists := m[key]; exists { - current = val - } else { - return nil - } - } else { - return nil - } - } - if idx > -1 { - if arr, ok := current.([]interface{}); ok { - if idx < len(arr) { - current = arr[idx] - } else { - return nil - } - } else { - return nil - } - } - } - return current + path = strings.TrimPrefix(path, "$.") + if path == "" { + return data + } + parts := strings.Split(path, ".") + current := data + for _, part := range parts { + if current == nil { + return nil + } + key := part + idx := -1 + if i := strings.Index(part, "["); i > -1 && strings.HasSuffix(part, "]") { + key = part[:i] + idxStr := part[i+1 : len(part)-1] + if val, err := strconv.Atoi(idxStr); err == nil { + idx = val + } + } + if key != "" { + if m, ok := current.(map[string]interface{}); ok { + if val, exists := m[key]; exists { + current = val + } else { + return nil + } + } else { + return nil + } + } + if idx > -1 { + if arr, ok := current.([]interface{}); ok { + if idx < len(arr) { + current = arr[idx] + } else { + return nil + } + } else { + return nil + } + } + } + return current } func (hc *HTTPCollector) extractRow(item interface{}, fields []string) job.ValueRow { - row := job.ValueRow{Columns: make([]string, len(fields))} - m, isMap := item.(map[string]interface{}) - for i, field := range fields { - if strings.EqualFold(field, constants.RESPONSE_TIME) || strings.EqualFold(field, constants.StatusCode) { - row.Columns[i] = constants.NULL_VALUE - continue - } - var val interface{} - if isMap { - if v, ok := m[field]; ok { - val = v - } else { - val = hc.navigateJson(item, field) - } - } - if val != nil { - row.Columns[i] = fmt.Sprintf("%v", val) - } else { - row.Columns[i] = constants.NULL_VALUE - } - } - return row + row := job.ValueRow{Columns: make([]string, len(fields))} + m, isMap := item.(map[string]interface{}) + for i, field := range fields { + if strings.EqualFold(field, constants.RESPONSE_TIME) || strings.EqualFold(field, constants.StatusCode) { + row.Columns[i] = constants.NULL_VALUE + continue + } + var val interface{} + if isMap { + if v, ok := m[field]; ok { + val = v + } else { + val = hc.navigateJson(item, field) + } + } + if val != nil { + row.Columns[i] = fmt.Sprintf("%v", val) + } else { + row.Columns[i] = constants.NULL_VALUE + } + } + return row } func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData, fields []string, responseTime int64, statusCode int) { - if resp == nil { - return - } - if len(resp.Fields) == 0 { - resp.Fields = make([]job.Field, len(fields)) - for i, f := range fields { - resp.Fields[i] = job.Field{Field: f, Type: constants.TYPE_STRING} - } - } - if len(resp.Values) == 0 { - row := job.ValueRow{Columns: make([]string, len(fields))} - for k := range row.Columns { - row.Columns[k] = constants.NULL_VALUE - } - resp.Values = append(resp.Values, row) - } - for i := range resp.Values { - for j, field := range fields { - if strings.EqualFold(field, constants.RESPONSE_TIME) { - resp.Values[i].Columns[j] = strconv.FormatInt(responseTime, 10) - } - if strings.EqualFold(field, constants.StatusCode) { - resp.Values[i].Columns[j] = strconv.Itoa(statusCode) - } - } - } + if resp == nil { + return + } + if len(resp.Fields) == 0 { + resp.Fields = make([]job.Field, len(fields)) + for i, f := range fields { + resp.Fields[i] = job.Field{Field: f, Type: constants.TYPE_STRING} + } + } + if len(resp.Values) == 0 { + row := job.ValueRow{Columns: make([]string, len(fields))} + for k := range row.Columns { + row.Columns[k] = constants.NULL_VALUE + } + resp.Values = append(resp.Values, row) + } + for i := range resp.Values { + for j, field := range fields { + if strings.EqualFold(field, constants.RESPONSE_TIME) { + resp.Values[i].Columns[j] = strconv.FormatInt(responseTime, 10) + } + if strings.EqualFold(field, constants.StatusCode) { + resp.Values[i].Columns[j] = strconv.Itoa(statusCode) + } + } + } } func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration { - if timeoutStr == "" { - return 10 * time.Second - } - if val, err := strconv.Atoi(timeoutStr); err == nil { - return time.Duration(val) * time.Millisecond - } - return 10 * time.Second + if timeoutStr == "" { + return 10 * time.Second + } + if val, err := strconv.Atoi(timeoutStr); err == nil { + return time.Duration(val) * time.Millisecond + } + return 10 * time.Second } func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics) *job.CollectRepMetricsData { - return &job.CollectRepMetricsData{ - Metrics: metrics.Name, - Time: time.Now().UnixMilli(), - Code: constants.CollectSuccess, - Msg: "success", - Values: make([]job.ValueRow, 0), - } + return &job.CollectRepMetricsData{ + Metrics: metrics.Name, + Time: time.Now().UnixMilli(), + Code: constants.CollectSuccess, + Msg: "success", + Values: make([]job.ValueRow, 0), + } } func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int, msg string) *job.CollectRepMetricsData { - return &job.CollectRepMetricsData{ - Metrics: metrics.Name, - Time: time.Now().UnixMilli(), - Code: code, - Msg: msg, - } + return &job.CollectRepMetricsData{ + Metrics: metrics.Name, + Time: time.Now().UnixMilli(), + Code: code, + Msg: msg, + } } diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go deleted file mode 100644 index 3145b48..0000000 --- a/internal/collector/basic/init.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package basic - -import ( - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" -) - -// init function is automatically executed when the package is imported -// It centrally registers factory functions for all protocols -func init() { - // Register factory functions for all protocols - // To add a new protocol, simply add a line here - strategy.RegisterFactory("jdbc", func(logger logger.Logger) strategy.Collector { - return database.NewJDBCCollector(logger) - }) - - strategy.RegisterFactory("ssh", func(logger logger.Logger) strategy.Collector { - return ssh.NewSSHCollector(logger) - }) - - strategy.RegisterFactory("http", func(logger logger.Logger) strategy.Collector { - return http.NewHTTPCollector(logger) - }) - - strategy.RegisterFactory("https", func(logger logger.Logger) strategy.Collector { - return http.NewHTTPCollector(logger) - }) - - strategy.RegisterFactory("redis", func(logger logger.Logger) strategy.Collector { - return redis.NewRedisCollector(logger) - }) - - // More protocols can be added here in the future: -} - -// InitializeAllCollectors initializes all registered collectors -// At this point, the init() function has already registered all factory functions -// This function creates the actual collector instances -func InitializeAllCollectors(logger logger.Logger) { - logger.Info("initializing all collectors") - - // Create collector instances using registered factory functions - strategy.InitializeCollectors(logger) - - logger.Info("all collectors initialized successfully") -} diff --git a/internal/collector/basic/redis/redis_collector.go b/internal/collector/basic/redis/redis_collector.go index 12553e4..9b23741 100644 --- a/internal/collector/basic/redis/redis_collector.go +++ b/internal/collector/basic/redis/redis_collector.go @@ -30,14 +30,21 @@ import ( "github.com/redis/go-redis/v9" "golang.org/x/crypto/ssh" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh" ) +func init() { + strategy.RegisterFactory(ProtocolRedis, func(logger logger.Logger) strategy.Collector { + return NewRedisCollector(logger) + }) +} + const ( ProtocolRedis = "redis" ResponseTime = "responseTime" diff --git a/internal/collector/basic/ssh/ssh_collector.go b/internal/collector/basic/ssh/ssh_collector.go index 20e1bab..a611ded 100644 --- a/internal/collector/basic/ssh/ssh_collector.go +++ b/internal/collector/basic/ssh/ssh_collector.go @@ -26,6 +26,7 @@ import ( "strings" "time" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" @@ -34,6 +35,12 @@ import ( sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh" ) +func init() { + strategy.RegisterFactory(ProtocolSSH, func(logger logger.Logger) strategy.Collector { + return NewSSHCollector(logger) + }) +} + const ( ProtocolSSH = "ssh" diff --git a/internal/collector/basic/standard/imports.go b/internal/collector/basic/standard/imports.go new file mode 100644 index 0000000..5eb3217 --- /dev/null +++ b/internal/collector/basic/standard/imports.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package standard + +import ( + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh" + + // extensions + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/extension/milvus" +) diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go b/internal/collector/common/collect/dispatch/metrics_collector.go index 9b7aff2..7261ebe 100644 --- a/internal/collector/common/collect/dispatch/metrics_collector.go +++ b/internal/collector/common/collect/dispatch/metrics_collector.go @@ -29,12 +29,8 @@ import ( "github.com/expr-lang/expr" "github.com/expr-lang/expr/vm" - // Import basic package with blank identifier to trigger its init() function - // This ensures all collector factories are registered automatically - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/extension/milvus" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" diff --git a/internal/collector/common/job/job_server.go b/internal/collector/common/job/job_server.go index 8702830..4451ffa 100644 --- a/internal/collector/common/job/job_server.go +++ b/internal/collector/common/job/job_server.go @@ -25,9 +25,9 @@ import ( "fmt" "sync" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher" clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" @@ -150,7 +150,9 @@ func (r *Runner) Start(ctx context.Context) error { // Initialize all collectors r.Logger.Info("Initializing collectors...") - basic.InitializeAllCollectors(r.Logger) + + // Create collector instances using registered factory functions + strategy.InitializeCollectors(r.Logger) // Start the time dispatcher if r.TimeDispatch != nil { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
