This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 3c93606 feat: add HTTP collector with support for various
authentication methods and response parsing (#29)
3c93606 is described below
commit 3c9360669ae74d7d345b94fd018600b7f0297568
Author: Logic <[email protected]>
AuthorDate: Wed Nov 26 20:46:45 2025 +0800
feat: add HTTP collector with support for various authentication methods
and response parsing (#29)
---
internal/collector/basic/http/.keep | 0
internal/collector/basic/http/http_collector.go | 588 +++++++++++++++++++++
internal/collector/basic/init.go | 14 +-
.../common/collect/dispatch/metrics_collector.go | 49 +-
.../collector/common/types/job/metrics_types.go | 56 +-
internal/util/param/param_replacer.go | 182 +++----
6 files changed, 738 insertions(+), 151 deletions(-)
diff --git a/internal/collector/basic/http/.keep
b/internal/collector/basic/http/.keep
deleted file mode 100644
index e69de29..0000000
diff --git a/internal/collector/basic/http/http_collector.go
b/internal/collector/basic/http/http_collector.go
new file mode 100644
index 0000000..1c065f0
--- /dev/null
+++ b/internal/collector/basic/http/http_collector.go
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package http
+
+import (
+ "bytes"
+ "crypto/md5"
+ "crypto/rand"
+ "crypto/tls"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/prometheus/common/expfmt"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+const (
+ ProtocolHTTP = "http"
+
+ // Parse Types
+ ParseTypeDefault = "default"
+ ParseTypeJsonPath = "jsonPath"
+ ParseTypePrometheus = "prometheus"
+ ParseTypeWebsite = "website"
+ ParseTypeHeader = "header"
+
+ // Auth Types
+ AuthTypeBasic = "Basic Auth"
+ AuthTypeBearer = "Bearer Token"
+ AuthTypeDigest = "Digest Auth"
+)
+
+type HTTPCollector struct {
+ logger logger.Logger
+}
+
+func NewHTTPCollector(log logger.Logger) *HTTPCollector {
+ return &HTTPCollector{
+ logger: log.WithName("http-collector"),
+ }
+}
+
+func (hc *HTTPCollector) Protocol() string {
+ return ProtocolHTTP
+}
+
+func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error {
+ if metrics == nil || metrics.HTTP == nil {
+ return fmt.Errorf("http configuration is missing")
+ }
+ return nil
+}
+
+func (hc *HTTPCollector) Collect(metrics *job.Metrics)
*job.CollectRepMetricsData {
+ start := time.Now()
+
+ // MetricsCollector has already performed parameter replacement on
metrics.HTTP
+ httpConfig := metrics.HTTP
+
+ // 1. Prepare URL
+ targetURL := httpConfig.URL
+
+ // Parse SSL bool string
+ isSSL := false
+ if httpConfig.SSL != "" {
+ if val, err := strconv.ParseBool(httpConfig.SSL); err == nil {
+ isSSL = val
+ }
+ }
+
+ if targetURL == "" {
+ schema := "http"
+ if isSSL {
+ schema = "https"
+ }
+ // Use metrics.ConfigMap values if URL is empty
+ // Note: metrics.ConfigMap is already populated with
job.Configmap values
+ host := metrics.ConfigMap["host"]
+ port := metrics.ConfigMap["port"]
+ if host != "" && port != "" {
+ targetURL = fmt.Sprintf("%s://%s:%s", schema, host,
port)
+ }
+ }
+
+ if targetURL != "" && !strings.HasPrefix(targetURL, "http") {
+ schema := "http"
+ if isSSL {
+ schema = "https"
+ }
+ targetURL = fmt.Sprintf("%s://%s", schema, targetURL)
+ }
+
+ if targetURL == "" {
+ return hc.createFailResponse(metrics, constants.CollectFail,
"target URL is empty")
+ }
+
+ // 2. Create Request
+ req, err := hc.createRequest(httpConfig, targetURL)
+ if err != nil {
+ return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("failed to create request: %v", err))
+ }
+
+ // 3. Create Client
+ timeoutStr := metrics.Timeout
+ if httpConfig.Timeout != "" {
+ timeoutStr = httpConfig.Timeout
+ }
+
+ timeout := hc.getTimeout(timeoutStr)
+ client := &http.Client{
+ Timeout: timeout,
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ },
+ }
+
+ // 4. Execute Request (with Digest retry logic)
+ resp, err := client.Do(req)
+
+ // Handle Digest Auth Challenge (401)
+ if err == nil && resp.StatusCode == http.StatusUnauthorized &&
+ httpConfig.Authorization != nil &&
+ strings.EqualFold(httpConfig.Authorization.Type,
AuthTypeDigest) {
+
+ // Close the first response body
+ io.Copy(io.Discard, resp.Body)
+ resp.Body.Close()
+
+ authHeader := resp.Header.Get("WWW-Authenticate")
+ if authHeader != "" {
+ // Create new request with Authorization header
+ digestReq, digestErr := hc.handleDigestAuth(req,
authHeader, httpConfig.Authorization)
+ if digestErr == nil {
+ // Retry request
+ resp, err = client.Do(digestReq)
+ } else {
+ hc.logger.Error(digestErr, "failed to handle
digest auth")
+ }
+ }
+ }
+
+ if err != nil {
+ return hc.createFailResponse(metrics,
constants.CollectUnReachable, fmt.Sprintf("request failed: %v", err))
+ }
+ defer resp.Body.Close()
+
+ responseTime := time.Since(start).Milliseconds()
+
+ // 5. Parse Response
+ parseType := httpConfig.ParseType
+ if parseType == "" {
+ parseType = ParseTypeDefault
+ }
+
+ var responseData *job.CollectRepMetricsData
+
+ // Read body
+ bodyBytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("failed to read body: %v", err))
+ }
+
+ switch parseType {
+ case ParseTypePrometheus:
+ responseData, err = hc.parsePrometheus(bodyBytes, metrics)
+ case ParseTypeWebsite:
+ responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode,
responseTime, metrics, httpConfig)
+ case ParseTypeHeader:
+ responseData, err = hc.parseHeader(resp.Header, metrics)
+ case ParseTypeJsonPath, ParseTypeDefault:
+ responseData, err = hc.parseJsonPath(bodyBytes, metrics,
responseTime, httpConfig)
+ default:
+ responseData, err = hc.parseJsonPath(bodyBytes, metrics,
responseTime, httpConfig)
+ }
+
+ if err != nil {
+ hc.logger.Error(err, "parse response failed", "type", parseType)
+ return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("parse error: %v", err))
+ }
+
+ hc.fillCommonFields(responseData, metrics.AliasFields, responseTime,
resp.StatusCode)
+
+ return responseData
+}
+
+// handleDigestAuth generates a new request with the Digest Authorization
header
+func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request,
authHeader string, authConfig *job.Authorization) (*http.Request, error) {
+ params := hc.parseAuthHeader(authHeader)
+ realm := params["realm"]
+ nonce := params["nonce"]
+ qop := params["qop"]
+ opaque := params["opaque"]
+ algorithm := params["algorithm"]
+ if algorithm == "" {
+ algorithm = "MD5"
+ }
+
+ if realm == "" || nonce == "" {
+ return nil, fmt.Errorf("missing realm or nonce in digest
header")
+ }
+
+ username := authConfig.DigestAuthUsername
+ password := authConfig.DigestAuthPassword
+ ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password))
+
+ method := originalReq.Method
+ uri := originalReq.URL.RequestURI()
+ ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri))
+
+ nc := "00000001"
+ cnonce := hc.generateCnonce()
+
+ var responseStr string
+ if qop == "" {
+ responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce,
ha2))
+ } else if strings.Contains(qop, "auth") {
+ responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1,
nonce, nc, cnonce, "auth", ha2))
+ } else {
+ return nil, fmt.Errorf("unsupported qop: %s", qop)
+ }
+
+ authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s",
uri="%s", response="%s", algorithm="%s"`,
+ username, realm, nonce, uri, responseStr, algorithm)
+
+ if opaque != "" {
+ authVal += fmt.Sprintf(`, opaque="%s"`, opaque)
+ }
+ if qop != "" {
+ authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc,
cnonce)
+ }
+
+ var newReq *http.Request
+ if originalReq.GetBody != nil {
+ body, _ := originalReq.GetBody()
+ newReq, _ = http.NewRequest(originalReq.Method,
originalReq.URL.String(), body)
+ } else {
+ newReq, _ = http.NewRequest(originalReq.Method,
originalReq.URL.String(), nil)
+ }
+
+ for k, v := range originalReq.Header {
+ newReq.Header[k] = v
+ }
+
+ newReq.Header.Set("Authorization", authVal)
+ return newReq, nil
+}
+
+func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string {
+ header = strings.TrimPrefix(header, "Digest ")
+ result := make(map[string]string)
+
+ pairs := strings.Split(header, ",")
+ for _, pair := range pairs {
+ parts := strings.SplitN(strings.TrimSpace(pair), "=", 2)
+ if len(parts) == 2 {
+ key := strings.TrimSpace(parts[0])
+ val := strings.TrimSpace(parts[1])
+ val = strings.Trim(val, "\"")
+ result[key] = val
+ }
+ }
+ return result
+}
+
+func (hc *HTTPCollector) md5Hex(s string) string {
+ hash := md5.Sum([]byte(s))
+ return hex.EncodeToString(hash[:])
+}
+
+func (hc *HTTPCollector) generateCnonce() string {
+ b := make([]byte, 8)
+ io.ReadFull(rand.Reader, b)
+ return hex.EncodeToString(b)
+}
+
+func (hc *HTTPCollector) createRequest(config *job.HTTPProtocol, targetURL
string) (*http.Request, error) {
+ method := strings.ToUpper(config.Method)
+ if method == "" {
+ method = "GET"
+ }
+
+ var body io.Reader
+ if config.Body != "" {
+ body = strings.NewReader(config.Body)
+ }
+
+ req, err := http.NewRequest(method, targetURL, body)
+ if err != nil {
+ return nil, err
+ }
+
+ for k, v := range config.Headers {
+ req.Header.Set(k, v)
+ }
+
+ if config.Authorization != nil {
+ auth := config.Authorization
+ if auth.Type == "" || strings.EqualFold(auth.Type,
AuthTypeBasic) {
+ if auth.BasicAuthUsername != "" &&
auth.BasicAuthPassword != "" {
+ req.SetBasicAuth(auth.BasicAuthUsername,
auth.BasicAuthPassword)
+ }
+ } else if strings.EqualFold(auth.Type, AuthTypeBearer) {
+ if auth.BearerTokenToken != "" {
+ req.Header.Set("Authorization", "Bearer
"+auth.BearerTokenToken)
+ }
+ }
+ }
+
+ if len(config.Params) > 0 {
+ q := req.URL.Query()
+ for k, v := range config.Params {
+ q.Add(k, v)
+ }
+ req.URL.RawQuery = q.Encode()
+ }
+
+ return req, nil
+}
+
+// --- Parsers ---
+
+func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics)
(*job.CollectRepMetricsData, error) {
+ response := hc.createSuccessResponse(metrics)
+ var parser expfmt.TextParser
+ metricFamilies, err :=
parser.TextToMetricFamilies(bytes.NewReader(body))
+ if err != nil {
+ return nil, err
+ }
+ now := time.Now().UnixMilli()
+ for _, mf := range metricFamilies {
+ for _, m := range mf.Metric {
+ row := job.ValueRow{Columns: make([]string,
len(metrics.AliasFields))}
+ labels := make(map[string]string)
+ for _, pair := range m.Label {
+ labels[pair.GetName()] = pair.GetValue()
+ }
+ hasValue := false
+ for i, field := range metrics.AliasFields {
+ if field == constants.NULL_VALUE {
+ continue
+ }
+ if field == "value" || field == "prom_value" {
+ if m.Gauge != nil {
+ row.Columns[i] =
fmt.Sprintf("%f", m.Gauge.GetValue())
+ } else if m.Counter != nil {
+ row.Columns[i] =
fmt.Sprintf("%f", m.Counter.GetValue())
+ } else if m.Untyped != nil {
+ row.Columns[i] =
fmt.Sprintf("%f", m.Untyped.GetValue())
+ }
+ hasValue = true
+ } else {
+ if val, ok := labels[field]; ok {
+ row.Columns[i] = val
+ hasValue = true
+ } else {
+ row.Columns[i] =
constants.NULL_VALUE
+ }
+ }
+ }
+ if hasValue {
+ response.Values = append(response.Values, row)
+ }
+ }
+ }
+ response.Time = now
+ return response, nil
+}
+
+func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int,
responseTime int64, metrics *job.Metrics, httpConfig *job.HTTPProtocol)
(*job.CollectRepMetricsData, error) {
+ response := hc.createSuccessResponse(metrics)
+ row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
+ keyword := httpConfig.Keyword
+ keywordCount := 0
+ if keyword != "" {
+ keywordCount = strings.Count(string(body), keyword)
+ }
+ for i, field := range metrics.AliasFields {
+ switch strings.ToLower(field) {
+ case strings.ToLower(constants.StatusCode):
+ row.Columns[i] = strconv.Itoa(statusCode)
+ case strings.ToLower(constants.RESPONSE_TIME):
+ row.Columns[i] = strconv.FormatInt(responseTime, 10)
+ case "keyword":
+ row.Columns[i] = strconv.Itoa(keywordCount)
+ default:
+ row.Columns[i] = constants.NULL_VALUE
+ }
+ }
+ response.Values = append(response.Values, row)
+ return response, nil
+}
+
+func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics)
(*job.CollectRepMetricsData, error) {
+ response := hc.createSuccessResponse(metrics)
+ row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
+ for i, field := range metrics.AliasFields {
+ val := header.Get(field)
+ if val != "" {
+ row.Columns[i] = val
+ } else {
+ row.Columns[i] = constants.NULL_VALUE
+ }
+ }
+ response.Values = append(response.Values, row)
+ return response, nil
+}
+
+func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics,
responseTime int64, httpConfig *job.HTTPProtocol) (*job.CollectRepMetricsData,
error) {
+ response := hc.createSuccessResponse(metrics)
+ var data interface{}
+ decoder := json.NewDecoder(bytes.NewReader(body))
+ decoder.UseNumber()
+ if err := decoder.Decode(&data); err != nil {
+ return response, nil
+ }
+ parseScript := httpConfig.ParseScript
+ root := data
+ if parseScript != "" && parseScript != "$" {
+ root = hc.navigateJson(data, parseScript)
+ }
+ if root == nil {
+ return response, nil
+ }
+ if arr, ok := root.([]interface{}); ok {
+ for _, item := range arr {
+ row := hc.extractRow(item, metrics.AliasFields)
+ response.Values = append(response.Values, row)
+ }
+ } else {
+ row := hc.extractRow(root, metrics.AliasFields)
+ response.Values = append(response.Values, row)
+ }
+ return response, nil
+}
+
+func (hc *HTTPCollector) navigateJson(data interface{}, path string)
interface{} {
+ path = strings.TrimPrefix(path, "$.")
+ if path == "" {
+ return data
+ }
+ parts := strings.Split(path, ".")
+ current := data
+ for _, part := range parts {
+ if current == nil {
+ return nil
+ }
+ key := part
+ idx := -1
+ if i := strings.Index(part, "["); i > -1 &&
strings.HasSuffix(part, "]") {
+ key = part[:i]
+ idxStr := part[i+1 : len(part)-1]
+ if val, err := strconv.Atoi(idxStr); err == nil {
+ idx = val
+ }
+ }
+ if key != "" {
+ if m, ok := current.(map[string]interface{}); ok {
+ if val, exists := m[key]; exists {
+ current = val
+ } else {
+ return nil
+ }
+ } else {
+ return nil
+ }
+ }
+ if idx > -1 {
+ if arr, ok := current.([]interface{}); ok {
+ if idx < len(arr) {
+ current = arr[idx]
+ } else {
+ return nil
+ }
+ } else {
+ return nil
+ }
+ }
+ }
+ return current
+}
+
+func (hc *HTTPCollector) extractRow(item interface{}, fields []string)
job.ValueRow {
+ row := job.ValueRow{Columns: make([]string, len(fields))}
+ m, isMap := item.(map[string]interface{})
+ for i, field := range fields {
+ if strings.EqualFold(field, constants.RESPONSE_TIME) ||
strings.EqualFold(field, constants.StatusCode) {
+ row.Columns[i] = constants.NULL_VALUE
+ continue
+ }
+ var val interface{}
+ if isMap {
+ if v, ok := m[field]; ok {
+ val = v
+ } else {
+ val = hc.navigateJson(item, field)
+ }
+ }
+ if val != nil {
+ row.Columns[i] = fmt.Sprintf("%v", val)
+ } else {
+ row.Columns[i] = constants.NULL_VALUE
+ }
+ }
+ return row
+}
+
+func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData,
fields []string, responseTime int64, statusCode int) {
+ if resp == nil {
+ return
+ }
+ if len(resp.Fields) == 0 {
+ resp.Fields = make([]job.Field, len(fields))
+ for i, f := range fields {
+ resp.Fields[i] = job.Field{Field: f, Type:
constants.TYPE_STRING}
+ }
+ }
+ if len(resp.Values) == 0 {
+ row := job.ValueRow{Columns: make([]string, len(fields))}
+ for k := range row.Columns {
+ row.Columns[k] = constants.NULL_VALUE
+ }
+ resp.Values = append(resp.Values, row)
+ }
+ for i := range resp.Values {
+ for j, field := range fields {
+ if strings.EqualFold(field, constants.RESPONSE_TIME) {
+ resp.Values[i].Columns[j] =
strconv.FormatInt(responseTime, 10)
+ }
+ if strings.EqualFold(field, constants.StatusCode) {
+ resp.Values[i].Columns[j] =
strconv.Itoa(statusCode)
+ }
+ }
+ }
+}
+
+func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration {
+ if timeoutStr == "" {
+ return 10 * time.Second
+ }
+ if val, err := strconv.Atoi(timeoutStr); err == nil {
+ return time.Duration(val) * time.Millisecond
+ }
+ return 10 * time.Second
+}
+
+func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics)
*job.CollectRepMetricsData {
+ return &job.CollectRepMetricsData{
+ Metrics: metrics.Name,
+ Time: time.Now().UnixMilli(),
+ Code: constants.CollectSuccess,
+ Msg: "success",
+ Values: make([]job.ValueRow, 0),
+ }
+}
+
+func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int,
msg string) *job.CollectRepMetricsData {
+ return &job.CollectRepMetricsData{
+ Metrics: metrics.Name,
+ Time: time.Now().UnixMilli(),
+ Code: code,
+ Msg: msg,
+ }
+}
diff --git a/internal/collector/basic/init.go b/internal/collector/basic/init.go
index 68a282e..de47b58 100644
--- a/internal/collector/basic/init.go
+++ b/internal/collector/basic/init.go
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,6 +21,7 @@ package basic
import (
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
@@ -39,10 +40,15 @@ func init() {
return ssh.NewSSHCollector(logger)
})
+ strategy.RegisterFactory("http", func(logger logger.Logger)
strategy.Collector {
+ return http.NewHTTPCollector(logger)
+ })
+
+ strategy.RegisterFactory("https", func(logger logger.Logger)
strategy.Collector {
+ return http.NewHTTPCollector(logger)
+ })
+
// More protocols can be added here in the future:
- // strategy.RegisterFactory("http", func(logger logger.Logger)
strategy.Collector {
- // return http.NewHTTPCollector(logger)
- // })
// strategy.RegisterFactory("redis", func(logger logger.Logger)
strategy.Collector {
// return redis.NewRedisCollector(logger)
// })
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/collector/common/collect/dispatch/metrics_collector.go
index c4aea7e..131f13d 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -218,12 +218,6 @@ func (mc *MetricsCollector) CalculateFields(metrics
*jobtypes.Metrics, result *j
}
// parseCalculates parses the calculates configuration and compiles expressions
-// Returns a map of field name to compiled expression program, and a map of
field name to alias field
-// Supports two formats:
-// 1. String array format: []string - "field=expression" or "field=aliasField"
-// Example: "usage=(used / total) * 100" or "total=total"
-// 2. Calculate struct array format: []Calculate - {field: "field", script:
"expression", aliasField: "alias"}
-// 3. Mixed interface array: []interface{} - can contain both strings and
Calculate structs
func (mc *MetricsCollector) parseCalculates(calculates interface{})
(map[string]*vm.Program, map[string]string) {
fieldExpressionMap := make(map[string]*vm.Program)
fieldAliasMap := make(map[string]string)
@@ -313,11 +307,6 @@ func (mc *MetricsCollector) parseCalculates(calculates
interface{}) (map[string]
}
// parseUnits parses the units configuration for unit conversions
-// Supports three formats:
-// 1. String array format: []string - "fieldName=originUnit->newUnit"
-// Example: "committed=B->MB"
-// 2. Unit struct array format: []Unit - {field: "field", unit:
"originUnit->newUnit"}
-// 3. Mixed interface array: []interface{} - can contain both strings and
Unit structs
func (mc *MetricsCollector) parseUnits(units interface{})
map[string]*unit.Conversion {
fieldUnitConversionMap := make(map[string]*unit.Conversion)
@@ -390,8 +379,6 @@ func (mc *MetricsCollector) parseUnits(units interface{})
map[string]*unit.Conve
}
// parseUnitsFromStringArray parses units from string array format
-// Format: "fieldName=originUnit->newUnit"
-// Example: "committed=B->MB"
func (mc *MetricsCollector) parseUnitsFromStringArray(units []string,
fieldUnitConversionMap map[string]*unit.Conversion) {
for _, unitStr := range units {
conversion, err := unit.ParseConversion(unitStr)
@@ -426,10 +413,6 @@ func (mc *MetricsCollector)
parseUnitsFromStructArray(units []jobtypes.Unit, fie
}
// parseCalculatesFromStringArray parses calculates from string array format
-// Format: "field=expression" where expression can be a simple field name or a
calculation
-// Examples:
-// - "total=total" (simple field mapping)
-// - "usage=(used / total) * 100" (expression calculation)
func (mc *MetricsCollector) parseCalculatesFromStringArray(calculates []string,
fieldExpressionMap map[string]*vm.Program, fieldAliasMap
map[string]string) {
@@ -484,7 +467,6 @@ func (mc *MetricsCollector)
parseCalculatesFromStructArray(calculates []jobtypes
}
// splitCalculateString splits "field=expression" into [field, expression]
-// Simply splits by the first '=' character
func splitCalculateString(calc string) []string {
idx := strings.Index(calc, "=")
if idx == -1 {
@@ -538,6 +520,33 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
startTime := time.Now()
+ // Populate metrics.ConfigMap from job.Configmap if missing
+ if metrics.ConfigMap == nil {
+ metrics.ConfigMap = make(map[string]string)
+ }
+ // Merge global job config params into metrics config map
+ if job.Configmap != nil {
+ for _, param := range job.Configmap {
+ // Fix: Handle nil value gracefully to avoid
"<nil>" literal in string
+ var valStr string
+ if param.Value == nil {
+ valStr = ""
+ } else {
+ valStr = fmt.Sprintf("%v", param.Value)
+ }
+ metrics.ConfigMap[param.Key] = valStr
+ }
+ }
+
+ // Perform parameter replacement using the centralized Replacer
+ // This handles all protocols (HTTP, SSH, JDBC, etc.)
+ replacer := param.NewReplacer()
+ if err := replacer.ReplaceMetricsParams(metrics,
metrics.ConfigMap); err != nil {
+ mc.logger.Error(err, "failed to replace params",
"metricsName", metrics.Name)
+ resultChan <- mc.createErrorResponse(metrics, job,
constants.CollectFail, fmt.Sprintf("Param replacement error: %v", err))
+ return
+ }
+
// Get the appropriate collector based on protocol
collector, err := strategy.CollectorFor(metrics.Protocol)
if err != nil {
@@ -550,7 +559,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics
*jobtypes.Metrics, job *jobty
return
}
- // Perform the actual collection with metrics (parameters
should already be replaced by CommonDispatcher)
+ // Perform the actual collection
result := collector.Collect(metrics)
// calculate fields and convert units by metrics.Calculates and
metrics.Units
diff --git a/internal/collector/common/types/job/metrics_types.go
b/internal/collector/common/types/job/metrics_types.go
index 6474be1..293811a 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -169,17 +169,27 @@ type ValueRow struct {
// HTTPProtocol represents HTTP protocol configuration
type HTTPProtocol struct {
- URL string `json:"url"`
- Method string `json:"method"`
- Headers map[string]string `json:"headers"`
- Params map[string]string `json:"params"`
- Body string `json:"body"`
- ParseScript string `json:"parseScript"`
- ParseType string `json:"parseType"`
- Keyword string `json:"keyword"`
- Username string `json:"username"`
- Password string `json:"password"`
- SSL bool `json:"ssl"`
+ URL string `json:"url"`
+ Method string `json:"method"`
+ Headers map[string]string `json:"headers"`
+ Params map[string]string `json:"params"`
+ Body string `json:"body"`
+ ParseScript string `json:"parseScript"`
+ ParseType string `json:"parseType"`
+ Keyword string `json:"keyword"`
+ Timeout string `json:"timeout"`
+ SSL string `json:"ssl"`
+ Authorization *Authorization `json:"authorization"`
+}
+
+// Authorization represents HTTP authorization configuration
+type Authorization struct {
+ Type string `json:"type"`
+ BasicAuthUsername string `json:"basicAuthUsername"`
+ BasicAuthPassword string `json:"basicAuthPassword"`
+ DigestAuthUsername string `json:"digestAuthUsername"`
+ DigestAuthPassword string `json:"digestAuthPassword"`
+ BearerTokenToken string `json:"bearerTokenToken"`
}
// SSHProtocol represents SSH protocol configuration
@@ -341,6 +351,28 @@ func (j *Job) Clone() *Job {
clone.Metrics[i].AliasFields = make([]string,
len(metric.AliasFields))
copy(clone.Metrics[i].AliasFields,
metric.AliasFields)
}
+
+ // Deep copy HTTP Protocol if exists
+ if metric.HTTP != nil {
+ cloneHTTP := *metric.HTTP
+ if metric.HTTP.Headers != nil {
+ cloneHTTP.Headers =
make(map[string]string, len(metric.HTTP.Headers))
+ for k, v := range metric.HTTP.Headers {
+ cloneHTTP.Headers[k] = v
+ }
+ }
+ if metric.HTTP.Params != nil {
+ cloneHTTP.Params =
make(map[string]string, len(metric.HTTP.Params))
+ for k, v := range metric.HTTP.Params {
+ cloneHTTP.Params[k] = v
+ }
+ }
+ if metric.HTTP.Authorization != nil {
+ cloneAuth := *metric.HTTP.Authorization
+ cloneHTTP.Authorization = &cloneAuth
+ }
+ clone.Metrics[i].HTTP = &cloneHTTP
+ }
}
}
diff --git a/internal/util/param/param_replacer.go
b/internal/util/param/param_replacer.go
index 4346e05..5e95eed 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -86,7 +86,7 @@ func (r *Replacer) ReplaceJobParams(job *jobtypes.Job)
(*jobtypes.Job, error) {
// Replace parameters in all metrics
for i := range clonedJob.Metrics {
- if err := r.replaceMetricsParams(&clonedJob.Metrics[i],
paramMap); err != nil {
+ if err := r.ReplaceMetricsParams(&clonedJob.Metrics[i],
paramMap); err != nil {
return nil, fmt.Errorf("failed to replace params in
metrics %s: %w", clonedJob.Metrics[i].Name, err)
}
}
@@ -126,74 +126,28 @@ func (r *Replacer) createParamMapSimple(configmap
[]jobtypes.Configmap) map[stri
return paramMap
}
-// createParamMap creates a parameter map from configmap entries (legacy
version with decryption)
-// Deprecated: Use PreprocessJobPasswords + createParamMapSafe instead
-func (r *Replacer) createParamMap(configmap []jobtypes.Configmap)
map[string]string {
- paramMap := make(map[string]string)
-
- for _, config := range configmap {
- // Convert value to string based on type, handle null values as
empty strings
- var strValue string
- if config.Value == nil {
- strValue = "" // null values become empty strings
- } else {
- switch config.Type {
- case 0: // number
- if numVal, ok := config.Value.(float64); ok {
- strValue = strconv.FormatFloat(numVal,
'f', -1, 64)
- } else if intVal, ok := config.Value.(int); ok {
- strValue = strconv.Itoa(intVal)
- } else {
- strValue = fmt.Sprintf("%v",
config.Value)
- }
- case 1: // string
- strValue = fmt.Sprintf("%v", config.Value)
- case 2: // password (encrypted)
- if encryptedValue, ok := config.Value.(string);
ok {
- log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelDebug).WithName("param-replacer")
- log.Sugar().Debugf("attempting to
decrypt password: %s", encryptedValue)
- if decoded, err :=
r.decryptPassword(encryptedValue); err == nil {
- log.Info("password decryption
successful", "length", len(decoded))
- strValue = decoded
- } else {
- log.Error(err, "password
decryption failed, using original value")
- // Fallback to original value
if decryption fails
- strValue = encryptedValue
- }
- } else {
- strValue = fmt.Sprintf("%v",
config.Value)
- }
- default:
- strValue = fmt.Sprintf("%v", config.Value)
- }
- }
- paramMap[config.Key] = strValue
- }
-
- return paramMap
-}
-
-// replaceMetricsParams replaces parameters in metrics configuration
-func (r *Replacer) replaceMetricsParams(metrics *jobtypes.Metrics, paramMap
map[string]string) error {
- // Replace parameters in protocol-specific configurations
- // Currently only JDBC is defined as interface{} in the struct
definition
- // Other protocols are concrete struct pointers and don't contain
placeholders from JSON
+// ReplaceMetricsParams replaces parameters in metrics configuration
+// Exported method to be called by MetricsCollector
+func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap
map[string]string) error {
+ // 1. JDBC
if metrics.JDBC != nil {
if err := r.replaceProtocolParams(&metrics.JDBC, paramMap); err
!= nil {
return fmt.Errorf("failed to replace JDBC params: %w",
err)
}
}
- // TODO: Add other protocol configurations as they are converted to
interface{} types
- // For now, other protocols (HTTP, SSH, etc.) are concrete struct
pointers and
- // don't require parameter replacement
-
+ // 2. SSH
if metrics.SSH != nil {
if err := r.replaceProtocolParams(&metrics.SSH, paramMap); err
!= nil {
return fmt.Errorf("failed to replace SSH params: %w",
err)
}
}
+ // 3. HTTP
+ if metrics.HTTP != nil {
+ r.replaceHTTPParams(metrics.HTTP, paramMap)
+ }
+
// Replace parameters in basic metrics fields
if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
return fmt.Errorf("failed to replace basic metrics params: %w",
err)
@@ -202,7 +156,55 @@ func (r *Replacer) replaceMetricsParams(metrics
*jobtypes.Metrics, paramMap map[
return nil
}
-// replaceProtocolParams replaces parameters in any protocol configuration
+// replaceHTTPParams specific replacement logic for HTTPProtocol struct
+func (r *Replacer) replaceHTTPParams(http *jobtypes.HTTPProtocol, paramMap
map[string]string) {
+ http.URL = r.replaceParamPlaceholders(http.URL, paramMap)
+ http.Method = r.replaceParamPlaceholders(http.Method, paramMap)
+ http.Body = r.replaceParamPlaceholders(http.Body, paramMap)
+ http.ParseScript = r.replaceParamPlaceholders(http.ParseScript,
paramMap)
+ http.ParseType = r.replaceParamPlaceholders(http.ParseType, paramMap)
+ http.Keyword = r.replaceParamPlaceholders(http.Keyword, paramMap)
+ http.Timeout = r.replaceParamPlaceholders(http.Timeout, paramMap)
+ http.SSL = r.replaceParamPlaceholders(http.SSL, paramMap)
+
+ // Headers
+ if http.Headers != nil {
+ newHeaders := make(map[string]string)
+ for k, v := range http.Headers {
+ newKey := r.replaceParamPlaceholders(k, paramMap)
+ // Filter out keys that are empty or still contain
placeholders
+ if newKey != "" && !strings.Contains(newKey, "^_^") {
+ newHeaders[newKey] =
r.replaceParamPlaceholders(v, paramMap)
+ }
+ }
+ http.Headers = newHeaders
+ }
+
+ // Params
+ if http.Params != nil {
+ newParams := make(map[string]string)
+ for k, v := range http.Params {
+ newKey := r.replaceParamPlaceholders(k, paramMap)
+ if newKey != "" && !strings.Contains(newKey, "^_^") {
+ newParams[newKey] =
r.replaceParamPlaceholders(v, paramMap)
+ }
+ }
+ http.Params = newParams
+ }
+
+ // Authorization
+ if http.Authorization != nil {
+ auth := http.Authorization
+ auth.Type = r.replaceParamPlaceholders(auth.Type, paramMap)
+ auth.BasicAuthUsername =
r.replaceParamPlaceholders(auth.BasicAuthUsername, paramMap)
+ auth.BasicAuthPassword =
r.replaceParamPlaceholders(auth.BasicAuthPassword, paramMap)
+ auth.DigestAuthUsername =
r.replaceParamPlaceholders(auth.DigestAuthUsername, paramMap)
+ auth.DigestAuthPassword =
r.replaceParamPlaceholders(auth.DigestAuthPassword, paramMap)
+ auth.BearerTokenToken =
r.replaceParamPlaceholders(auth.BearerTokenToken, paramMap)
+ }
+}
+
+// replaceProtocolParams replaces parameters in any protocol configuration
defined as interface{}
func (r *Replacer) replaceProtocolParams(protocolInterface *interface{},
paramMap map[string]string) error {
if *protocolInterface == nil {
return nil
@@ -211,11 +213,9 @@ func (r *Replacer) replaceProtocolParams(protocolInterface
*interface{}, paramMa
// Convert protocol interface{} to map for manipulation
protocolMap, ok := (*protocolInterface).(map[string]interface{})
if !ok {
- // If it's already processed or not a map, skip
return nil
}
- // Recursively replace parameters in all string values
return r.replaceParamsInMap(protocolMap, paramMap)
}
@@ -224,15 +224,12 @@ func (r *Replacer) replaceParamsInMap(data
map[string]interface{}, paramMap map[
for key, value := range data {
switch v := value.(type) {
case string:
- // Replace parameters in string values
data[key] = r.replaceParamPlaceholders(v, paramMap)
case map[string]interface{}:
- // Recursively handle nested maps
if err := r.replaceParamsInMap(v, paramMap); err != nil
{
return fmt.Errorf("failed to replace params in
nested map %s: %w", key, err)
}
case []interface{}:
- // Handle arrays
for i, item := range v {
if itemMap, ok :=
item.(map[string]interface{}); ok {
if err := r.replaceParamsInMap(itemMap,
paramMap); err != nil {
@@ -249,41 +246,35 @@ func (r *Replacer) replaceParamsInMap(data
map[string]interface{}, paramMap map[
// replaceBasicMetricsParams replaces parameters in basic metrics fields
func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics,
paramMap map[string]string) error {
- // Replace parameters in basic string fields
metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap)
metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap)
metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap)
metrics.Range = r.replaceParamPlaceholders(metrics.Range, paramMap)
- // Replace parameters in ConfigMap
if metrics.ConfigMap != nil {
for key, value := range metrics.ConfigMap {
metrics.ConfigMap[key] =
r.replaceParamPlaceholders(value, paramMap)
}
}
-
return nil
}
// replaceParamPlaceholders replaces ^_^paramName^_^ placeholders with actual
values
func (r *Replacer) replaceParamPlaceholders(template string, paramMap
map[string]string) string {
- // Guard against empty templates to prevent strings.ReplaceAll issues
if template == "" {
return ""
}
-
+ if !strings.Contains(template, "^_^") {
+ return template
+ }
result := template
-
- // Find all ^_^paramName^_^ patterns and replace them
for paramName, paramValue := range paramMap {
- // Guard against empty parameter names
if paramName == "" {
continue
}
placeholder := fmt.Sprintf("^_^%s^_^", paramName)
result = strings.ReplaceAll(result, placeholder, paramValue)
}
-
return result
}
@@ -293,9 +284,7 @@ func (r *Replacer) ExtractProtocolConfig(protocolInterface
interface{}, targetSt
return fmt.Errorf("protocol interface is nil")
}
- // If it's a map (from JSON parsing), convert to target struct
if protocolMap, ok := protocolInterface.(map[string]interface{}); ok {
- // Convert map to JSON and then unmarshal to target struct
jsonData, err := json.Marshal(protocolMap)
if err != nil {
return fmt.Errorf("failed to marshal protocol config:
%w", err)
@@ -311,87 +300,65 @@ func (r *Replacer)
ExtractProtocolConfig(protocolInterface interface{}, targetSt
return fmt.Errorf("unsupported protocol config type: %T",
protocolInterface)
}
-// ExtractJDBCConfig extracts and processes JDBC configuration after parameter
replacement
+// ExtractJDBCConfig extracts and processes JDBC configuration
func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{})
(*jobtypes.JDBCProtocol, error) {
if jdbcInterface == nil {
return nil, nil
}
-
- // If it's already a JDBCProtocol struct
if jdbcConfig, ok := jdbcInterface.(*jobtypes.JDBCProtocol); ok {
return jdbcConfig, nil
}
-
- // Use the generic extraction method
var jdbcConfig jobtypes.JDBCProtocol
if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err !=
nil {
return nil, fmt.Errorf("failed to extract JDBC config: %w", err)
}
-
return &jdbcConfig, nil
}
-// ExtractHTTPConfig extracts and processes HTTP configuration after parameter
replacement
+// ExtractHTTPConfig extracts and processes HTTP configuration
func (r *Replacer) ExtractHTTPConfig(httpInterface interface{})
(*jobtypes.HTTPProtocol, error) {
if httpInterface == nil {
return nil, nil
}
-
- // If it's already an HTTPProtocol struct
if httpConfig, ok := httpInterface.(*jobtypes.HTTPProtocol); ok {
return httpConfig, nil
}
-
- // Use the generic extraction method
var httpConfig jobtypes.HTTPProtocol
if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err !=
nil {
return nil, fmt.Errorf("failed to extract HTTP config: %w", err)
}
-
return &httpConfig, nil
}
-// ExtractSSHConfig extracts and processes SSH configuration after parameter
replacement
+// ExtractSSHConfig extracts and processes SSH configuration
func (r *Replacer) ExtractSSHConfig(sshInterface interface{})
(*jobtypes.SSHProtocol, error) {
if sshInterface == nil {
return nil, nil
}
-
- // If it's already an SSHProtocol struct
if sshConfig, ok := sshInterface.(*jobtypes.SSHProtocol); ok {
return sshConfig, nil
}
-
- // Use the generic extraction method
var sshConfig jobtypes.SSHProtocol
if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil
{
return nil, fmt.Errorf("failed to extract SSH config: %w", err)
}
-
return &sshConfig, nil
}
// decryptPassword decrypts an encrypted password using AES
-// This implements the same algorithm as Java manager's AESUtil
func (r *Replacer) decryptPassword(encryptedPassword string) (string, error) {
log := logger.DefaultLogger(os.Stdout,
loggertype.LogLevelDebug).WithName("password-decrypt")
-
- // Use the AES utility (matches Java: AesUtil.aesDecode)
if result, err := crypto.AesDecode(encryptedPassword); err == nil {
log.Info("password decrypted successfully", "length",
len(result))
return result, nil
} else {
log.Sugar().Debugf("primary decryption failed: %v", err)
}
-
- // Fallback: try with default Java key if no manager key is set
defaultKey := "tomSun28HaHaHaHa"
if result, err := crypto.AesDecodeWithKey(encryptedPassword,
defaultKey); err == nil {
log.Info("password decrypted with default key", "length",
len(result))
return result, nil
}
-
- // If all decryption attempts fail, return original value to allow
system to continue
log.Info("all decryption attempts failed, using original value")
return encryptedPassword, nil
}
@@ -402,42 +369,28 @@ type DoubleAndUnit struct {
Unit string
}
-// Common unit symbols to check for
-// Order matters: longer strings should come before shorter ones to avoid
partial matches
-// e.g., "Ki" before "K", "Mi" before "M", "Gi" before "G"
var unitSymbols = []string{
"%", "Gi", "Mi", "Ki", "G", "g", "M", "m", "K", "k", "B", "b",
}
// ExtractDoubleAndUnitFromStr extracts a double value and unit from a string
-// Examples:
-// - "123.45" -> {Value: 123.45, Unit: ""}
-// - "23.43GB" -> {Value: 23.43, Unit: "GB"}
-// - "33KB" -> {Value: 33, Unit: "KB"}
-// - "99%" -> {Value: 99, Unit: "%"}
-// - "MB" -> {Value: 0, Unit: "MB"}
func (r *Replacer) ExtractDoubleAndUnitFromStr(str string) *DoubleAndUnit {
if str == "" {
return nil
}
-
str = strings.TrimSpace(str)
doubleAndUnit := &DoubleAndUnit{}
-
if value, err := strconv.ParseFloat(str, 64); err == nil {
doubleAndUnit.Value = value
return doubleAndUnit
}
-
for _, unitSymbol := range unitSymbols {
index := strings.Index(str, unitSymbol)
-
if index == 0 {
doubleAndUnit.Value = 0
doubleAndUnit.Unit = strings.TrimSpace(str)
return doubleAndUnit
}
-
if index > 0 {
numericPart := str[:index]
if value, err :=
strconv.ParseFloat(strings.TrimSpace(numericPart), 64); err == nil {
@@ -447,6 +400,5 @@ func (r *Replacer) ExtractDoubleAndUnitFromStr(str string)
*DoubleAndUnit {
}
}
}
-
return nil
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]