This is an automated email from the ASF dual-hosted git repository.
shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
The following commit(s) were added to refs/heads/main by this push:
new 51f252a refactor: adjust layout (#35)
51f252a is described below
commit 51f252aadf158891826a03c13bcdad3e22744fee
Author: shown <[email protected]>
AuthorDate: Sun Dec 14 14:13:56 2025 +0800
refactor: adjust layout (#35)
---
cmd/main.go | 16 +-
internal/banner/embed.go | 4 +-
internal/cmd/server.go | 16 +-
internal/collector/basic/http/http_collector.go | 596 ---------------------
internal/collector/common/transport/transport.go | 277 ----------
internal/{collector => }/config/config.go | 6 +-
internal/{collector => }/config/config_factory.go | 4 +-
internal/{collector => }/config/config_test.go | 0
internal/{collector => }/config/env_config.go | 4 +-
internal/{collector => }/config/unified_config.go | 4 +-
internal/{collector/basic/dns => job/cache}/.keep | 0
.../collect/dispatch/metrics_collector.go | 6 +-
.../collect/dispatch/metrics_collector_test.go | 4 +-
.../common => job}/collect/lazy_message_router.go | 2 +-
.../collect/metrics/hertzbeat_metrics_collector.go | 0
.../common => job}/collect/result_handler.go | 6 +-
.../collect/strategy/strategy_factory.go | 2 +-
.../common => job}/dispatcher/common_dispatcher.go | 4 +-
.../basic/ftp => job/dispatcher/entrance}/.keep | 0
.../basic/icmp => job/dispatcher/exporter}/.keep | 0
.../dispatcher/hashed_wheel_timer.go | 2 +-
.../common => job}/dispatcher/time_wheel.go | 22 +-
.../common => job}/dispatcher/wheel_timer_task.go | 14 +-
.../common => job}/router/message_router.go | 18 +-
.../common/job => job/server}/job_server.go | 16 +-
.../collector_types.go => metrics/metrics.go} | 6 +-
.../basic/database/jdbc_collector.go | 14 +-
.../basic/database/jdbc_collector_test.go | 0
.../basic/imap => protocol/basic/dns}/.keep | 0
.../basic/ipmi2 => protocol/basic/ftp}/.keep | 0
internal/protocol/basic/http/http_collector.go | 596 +++++++++++++++++++++
.../basic/memcached => protocol/basic/icmp}/.keep | 0
.../basic/modbus => protocol/basic/imap}/.keep | 0
.../basic/mqtt => protocol/basic/ipmi2}/.keep | 0
.../basic/nginx => protocol/basic/memcached}/.keep | 0
.../basic/ntp => protocol/basic/modbus}/.keep | 0
.../basic/plc => protocol/basic/mqtt}/.keep | 0
.../basic/pop3 => protocol/basic/nginx}/.keep | 0
.../basic/prometheus => protocol/basic/ntp}/.keep | 0
.../basic/push => protocol/basic/plc}/.keep | 0
.../basic/redfish => protocol/basic/pop3}/.keep | 0
.../basic/s7 => protocol/basic/prometheus}/.keep | 0
.../basic/script => protocol/basic/push}/.keep | 0
.../basic/sd => protocol/basic/redfish}/.keep | 0
.../basic/redis/redis_collector.go | 14 +-
.../basic/redis/redis_collector_test.go | 6 +-
.../basic/smtp => protocol/basic/s7}/.keep | 0
.../basic/snmp => protocol/basic/script}/.keep | 0
.../basic/telnet => protocol/basic/sd}/.keep | 0
.../basic/udp => protocol/basic/smtp}/.keep | 0
.../basic/websocket => protocol/basic/snmp}/.keep | 0
.../basic/ssh/ssh_collector.go | 6 +-
.../common/cache => protocol/basic/telnet}/.keep | 0
.../entrance => protocol/basic/udp}/.keep | 0
.../exporter => protocol/basic/websocket}/.keep | 0
.../{collector => protocol}/extension/kafka/.keep | 0
.../extension/milvus/milvus_collector.go | 4 +-
.../extension/milvus/milvus_collector_test.go | 44 +-
.../extension/mongodb/.keep | 0
.../extension/nebulagraph/.keep | 0
.../extension/rocketmq/.keep | 0
.../basic => protocol}/standard/imports.go | 10 +-
internal/{collector/common => }/server/server.go | 4 +-
.../{collector/common => }/server/server_test.go | 0
internal/transport/processors.go | 4 +-
internal/transport/transport.go | 276 ++++++++++
.../common => }/types/collector/collector_types.go | 0
.../common => }/types/config/config_types.go | 0
.../common => }/types/err/error_types.go | 0
.../{collector/common => }/types/job/job_types.go | 0
.../common => }/types/job/metrics_types.go | 56 +-
.../types/job/protocol/common_request_protocol.go | 0
.../types/job/protocol/consul_sd_protocol.go | 0
.../types/job/protocol/http_protocol.go | 0
.../types/job/protocol/jdbc_protocol.go | 0
.../common => }/types/job/protocol/jmx_protocol.go | 0
.../types/job/protocol/milvus_protocol.go | 0
.../types/job/protocol/mongodb_protocol.go | 0
.../types/job/protocol/redis_protocol.go | 0
.../types/job/protocol/snmp_protocol.go | 0
.../common => }/types/job/protocol/ssh_protocol.go | 0
.../common => }/types/job/protocol/ssh_tunnel.go | 0
.../types/job/protocol/zookeeper_sd_protocol.go | 0
.../common => }/types/job/timeout_types.go | 0
.../common => }/types/logger/logging_types.go | 0
internal/util/arrow/arrow_serializer.go | 2 +-
internal/util/crypto/aes_util.go | 2 +-
internal/util/logger/logger.go | 107 ++--
internal/util/logger/logger_test.go | 2 +-
internal/util/param/param_replacer.go | 42 +-
internal/util/ssh/ssh_helper.go | 2 +-
internal/util/ssh/ssh_helper_test.go | 4 +-
92 files changed, 1109 insertions(+), 1115 deletions(-)
diff --git a/cmd/main.go b/cmd/main.go
index 2b497b4..8acd589 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -18,17 +18,17 @@
package main
import (
- "fmt"
- "os"
+ "fmt"
+ "os"
- "hertzbeat.apache.org/hertzbeat-collector-go/cmd/root"
- _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/standard"
+ "hertzbeat.apache.org/hertzbeat-collector-go/cmd/root"
+ _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/standard"
)
func main() {
- if err := root.GetRootCommand().Execute(); err != nil {
- _, _ = fmt.Fprintln(os.Stderr, err)
- os.Exit(1)
- }
+ if err := root.GetRootCommand().Execute(); err != nil {
+ _, _ = fmt.Fprintln(os.Stderr, err)
+ os.Exit(1)
+ }
}
diff --git a/internal/banner/embed.go b/internal/banner/embed.go
index e18bdd1..f3c3aa2 100644
--- a/internal/banner/embed.go
+++ b/internal/banner/embed.go
@@ -24,8 +24,8 @@ import (
"strconv"
"text/template"
- clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
- bannertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err"
+ clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+ bannertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
)
const (
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 687f52a..5980263 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -28,16 +28,16 @@ import (
"syscall"
"github.com/spf13/cobra"
+ cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
+ jobserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server"
+ clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+ transportserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+ collectortypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+ configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+ collectorerr
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
bannerouter
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
- jobserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
- clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
- transportserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
- collectortypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
- configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- collectorerr
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err"
- cfgloader
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
)
var (
diff --git a/internal/collector/basic/http/http_collector.go
b/internal/collector/basic/http/http_collector.go
deleted file mode 100644
index 32632a5..0000000
--- a/internal/collector/basic/http/http_collector.go
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package http
-
-import (
- "bytes"
- "crypto/md5"
- "crypto/rand"
- "crypto/tls"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strconv"
- "strings"
- "time"
-
- "github.com/prometheus/common/expfmt"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-func init() {
- strategy.RegisterFactory(ProtocolHTTP, func(logger logger.Logger)
strategy.Collector {
- return NewHTTPCollector(logger)
- })
-}
-
-const (
- ProtocolHTTP = "http"
-
- // Parse Types
- ParseTypeDefault = "default"
- ParseTypeJsonPath = "jsonPath"
- ParseTypePrometheus = "prometheus"
- ParseTypeWebsite = "website"
- ParseTypeHeader = "header"
-
- // Auth Types
- AuthTypeBasic = "Basic Auth"
- AuthTypeBearer = "Bearer Token"
- AuthTypeDigest = "Digest Auth"
-)
-
-type HTTPCollector struct {
- logger logger.Logger
-}
-
-func NewHTTPCollector(log logger.Logger) *HTTPCollector {
- return &HTTPCollector{
- logger: log.WithName("http-collector"),
- }
-}
-
-func (hc *HTTPCollector) Protocol() string {
- return ProtocolHTTP
-}
-
-func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error {
- if metrics == nil || metrics.HTTP == nil {
- return fmt.Errorf("http configuration is missing")
- }
- return nil
-}
-
-func (hc *HTTPCollector) Collect(metrics *job.Metrics)
*job.CollectRepMetricsData {
- start := time.Now()
-
- // MetricsCollector has already performed parameter replacement on
metrics.HTTP
- httpConfig := metrics.HTTP
-
- // 1. Prepare URL
- targetURL := httpConfig.URL
-
- // Parse SSL bool string
- isSSL := false
- if httpConfig.SSL != "" {
- if val, err := strconv.ParseBool(httpConfig.SSL); err == nil {
- isSSL = val
- }
- }
-
- if targetURL == "" {
- schema := "http"
- if isSSL {
- schema = "https"
- }
- // Use metrics.ConfigMap values if URL is empty
- // Note: metrics.ConfigMap is already populated with job.Configmap values
- host := metrics.ConfigMap["host"]
- port := metrics.ConfigMap["port"]
- if host != "" && port != "" {
- targetURL = fmt.Sprintf("%s://%s:%s", schema, host, port)
- }
- }
-
- if targetURL != "" && !strings.HasPrefix(targetURL, "http") {
- schema := "http"
- if isSSL {
- schema = "https"
- }
- targetURL = fmt.Sprintf("%s://%s", schema, targetURL)
- }
-
- if targetURL == "" {
- return hc.createFailResponse(metrics, constants.CollectFail, "target URL
is empty")
- }
-
- // 2. Create Request
- req, err := hc.createRequest(httpConfig, targetURL)
- if err != nil {
- return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("failed to create request: %v", err))
- }
-
- // 3. Create Client
- timeoutStr := metrics.Timeout
- if httpConfig.Timeout != "" {
- timeoutStr = httpConfig.Timeout
- }
-
- timeout := hc.getTimeout(timeoutStr)
- client := &http.Client{
- Timeout: timeout,
- Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
- },
- }
-
- // 4. Execute Request (with Digest retry logic)
- resp, err := client.Do(req)
-
- // Handle Digest Auth Challenge (401)
- if err == nil && resp.StatusCode == http.StatusUnauthorized &&
- httpConfig.Authorization != nil &&
- strings.EqualFold(httpConfig.Authorization.Type, AuthTypeDigest) {
-
- // Close the first response body
- io.Copy(io.Discard, resp.Body)
- resp.Body.Close()
-
- authHeader := resp.Header.Get("WWW-Authenticate")
- if authHeader != "" {
- // Create new request with Authorization header
- digestReq, digestErr := hc.handleDigestAuth(req, authHeader,
httpConfig.Authorization)
- if digestErr == nil {
- // Retry request
- resp, err = client.Do(digestReq)
- } else {
- hc.logger.Error(digestErr, "failed to handle digest auth")
- }
- }
- }
-
- if err != nil {
- return hc.createFailResponse(metrics, constants.CollectUnReachable,
fmt.Sprintf("request failed: %v", err))
- }
- defer resp.Body.Close()
-
- responseTime := time.Since(start).Milliseconds()
-
- // 5. Parse Response
- parseType := httpConfig.ParseType
- if parseType == "" {
- parseType = ParseTypeDefault
- }
-
- var responseData *job.CollectRepMetricsData
-
- // Read body
- bodyBytes, err := io.ReadAll(resp.Body)
- if err != nil {
- return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("failed to read body: %v", err))
- }
-
- switch parseType {
- case ParseTypePrometheus:
- responseData, err = hc.parsePrometheus(bodyBytes, metrics)
- case ParseTypeWebsite:
- responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode,
responseTime, metrics, httpConfig)
- case ParseTypeHeader:
- responseData, err = hc.parseHeader(resp.Header, metrics)
- case ParseTypeJsonPath, ParseTypeDefault:
- responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime,
httpConfig)
- default:
- responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime,
httpConfig)
- }
-
- if err != nil {
- hc.logger.Error(err, "parse response failed", "type", parseType)
- return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("parse error: %v", err))
- }
-
- hc.fillCommonFields(responseData, metrics.AliasFields, responseTime,
resp.StatusCode)
-
- return responseData
-}
-
-// handleDigestAuth generates a new request with the Digest Authorization
header
-func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request,
authHeader string, authConfig *protocol.Authorization) (*http.Request, error) {
- params := hc.parseAuthHeader(authHeader)
- realm := params["realm"]
- nonce := params["nonce"]
- qop := params["qop"]
- opaque := params["opaque"]
- algorithm := params["algorithm"]
- if algorithm == "" {
- algorithm = "MD5"
- }
-
- if realm == "" || nonce == "" {
- return nil, fmt.Errorf("missing realm or nonce in digest header")
- }
-
- username := authConfig.DigestAuthUsername
- password := authConfig.DigestAuthPassword
- ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password))
-
- method := originalReq.Method
- uri := originalReq.URL.RequestURI()
- ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri))
-
- nc := "00000001"
- cnonce := hc.generateCnonce()
-
- var responseStr string
- if qop == "" {
- responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce, ha2))
- } else if strings.Contains(qop, "auth") {
- responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc,
cnonce, "auth", ha2))
- } else {
- return nil, fmt.Errorf("unsupported qop: %s", qop)
- }
-
- authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s",
uri="%s", response="%s", algorithm="%s"`,
- username, realm, nonce, uri, responseStr, algorithm)
-
- if opaque != "" {
- authVal += fmt.Sprintf(`, opaque="%s"`, opaque)
- }
- if qop != "" {
- authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc, cnonce)
- }
-
- var newReq *http.Request
- if originalReq.GetBody != nil {
- body, _ := originalReq.GetBody()
- newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(),
body)
- } else {
- newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(),
nil)
- }
-
- for k, v := range originalReq.Header {
- newReq.Header[k] = v
- }
-
- newReq.Header.Set("Authorization", authVal)
- return newReq, nil
-}
-
-func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string {
- header = strings.TrimPrefix(header, "Digest ")
- result := make(map[string]string)
-
- pairs := strings.Split(header, ",")
- for _, pair := range pairs {
- parts := strings.SplitN(strings.TrimSpace(pair), "=", 2)
- if len(parts) == 2 {
- key := strings.TrimSpace(parts[0])
- val := strings.TrimSpace(parts[1])
- val = strings.Trim(val, "\"")
- result[key] = val
- }
- }
- return result
-}
-
-func (hc *HTTPCollector) md5Hex(s string) string {
- hash := md5.Sum([]byte(s))
- return hex.EncodeToString(hash[:])
-}
-
-func (hc *HTTPCollector) generateCnonce() string {
- b := make([]byte, 8)
- io.ReadFull(rand.Reader, b)
- return hex.EncodeToString(b)
-}
-
-func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol,
targetURL string) (*http.Request, error) {
- method := strings.ToUpper(config.Method)
- if method == "" {
- method = "GET"
- }
-
- var body io.Reader
- if config.Body != "" {
- body = strings.NewReader(config.Body)
- }
-
- req, err := http.NewRequest(method, targetURL, body)
- if err != nil {
- return nil, err
- }
-
- for k, v := range config.Headers {
- req.Header.Set(k, v)
- }
-
- if config.Authorization != nil {
- auth := config.Authorization
- if auth.Type == "" || strings.EqualFold(auth.Type, AuthTypeBasic) {
- if auth.BasicAuthUsername != "" && auth.BasicAuthPassword != "" {
- req.SetBasicAuth(auth.BasicAuthUsername, auth.BasicAuthPassword)
- }
- } else if strings.EqualFold(auth.Type, AuthTypeBearer) {
- if auth.BearerTokenToken != "" {
- req.Header.Set("Authorization", "Bearer "+auth.BearerTokenToken)
- }
- }
- }
-
- if len(config.Params) > 0 {
- q := req.URL.Query()
- for k, v := range config.Params {
- q.Add(k, v)
- }
- req.URL.RawQuery = q.Encode()
- }
-
- return req, nil
-}
-
-// --- Parsers ---
-
-func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics)
(*job.CollectRepMetricsData, error) {
- response := hc.createSuccessResponse(metrics)
- var parser expfmt.TextParser
- metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(body))
- if err != nil {
- return nil, err
- }
- now := time.Now().UnixMilli()
- for _, mf := range metricFamilies {
- for _, m := range mf.Metric {
- row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
- labels := make(map[string]string)
- for _, pair := range m.Label {
- labels[pair.GetName()] = pair.GetValue()
- }
- hasValue := false
- for i, field := range metrics.AliasFields {
- if field == constants.NULL_VALUE {
- continue
- }
- if field == "value" || field == "prom_value" {
- if m.Gauge != nil {
- row.Columns[i] = fmt.Sprintf("%f", m.Gauge.GetValue())
- } else if m.Counter != nil {
- row.Columns[i] = fmt.Sprintf("%f", m.Counter.GetValue())
- } else if m.Untyped != nil {
- row.Columns[i] = fmt.Sprintf("%f", m.Untyped.GetValue())
- }
- hasValue = true
- } else {
- if val, ok := labels[field]; ok {
- row.Columns[i] = val
- hasValue = true
- } else {
- row.Columns[i] = constants.NULL_VALUE
- }
- }
- }
- if hasValue {
- response.Values = append(response.Values, row)
- }
- }
- }
- response.Time = now
- return response, nil
-}
-
-func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int,
responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol)
(*job.CollectRepMetricsData, error) {
- response := hc.createSuccessResponse(metrics)
- row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
- keyword := httpConfig.Keyword
- keywordCount := 0
- if keyword != "" {
- keywordCount = strings.Count(string(body), keyword)
- }
- for i, field := range metrics.AliasFields {
- switch strings.ToLower(field) {
- case strings.ToLower(constants.StatusCode):
- row.Columns[i] = strconv.Itoa(statusCode)
- case strings.ToLower(constants.RESPONSE_TIME):
- row.Columns[i] = strconv.FormatInt(responseTime, 10)
- case "keyword":
- row.Columns[i] = strconv.Itoa(keywordCount)
- default:
- row.Columns[i] = constants.NULL_VALUE
- }
- }
- response.Values = append(response.Values, row)
- return response, nil
-}
-
-func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics)
(*job.CollectRepMetricsData, error) {
- response := hc.createSuccessResponse(metrics)
- row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
- for i, field := range metrics.AliasFields {
- val := header.Get(field)
- if val != "" {
- row.Columns[i] = val
- } else {
- row.Columns[i] = constants.NULL_VALUE
- }
- }
- response.Values = append(response.Values, row)
- return response, nil
-}
-
-func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics,
responseTime int64, httpConfig *protocol.HTTPProtocol)
(*job.CollectRepMetricsData, error) {
- response := hc.createSuccessResponse(metrics)
- var data interface{}
- decoder := json.NewDecoder(bytes.NewReader(body))
- decoder.UseNumber()
- if err := decoder.Decode(&data); err != nil {
- return response, nil
- }
- parseScript := httpConfig.ParseScript
- root := data
- if parseScript != "" && parseScript != "$" {
- root = hc.navigateJson(data, parseScript)
- }
- if root == nil {
- return response, nil
- }
- if arr, ok := root.([]interface{}); ok {
- for _, item := range arr {
- row := hc.extractRow(item, metrics.AliasFields)
- response.Values = append(response.Values, row)
- }
- } else {
- row := hc.extractRow(root, metrics.AliasFields)
- response.Values = append(response.Values, row)
- }
- return response, nil
-}
-
-func (hc *HTTPCollector) navigateJson(data interface{}, path string)
interface{} {
- path = strings.TrimPrefix(path, "$.")
- if path == "" {
- return data
- }
- parts := strings.Split(path, ".")
- current := data
- for _, part := range parts {
- if current == nil {
- return nil
- }
- key := part
- idx := -1
- if i := strings.Index(part, "["); i > -1 && strings.HasSuffix(part, "]") {
- key = part[:i]
- idxStr := part[i+1 : len(part)-1]
- if val, err := strconv.Atoi(idxStr); err == nil {
- idx = val
- }
- }
- if key != "" {
- if m, ok := current.(map[string]interface{}); ok {
- if val, exists := m[key]; exists {
- current = val
- } else {
- return nil
- }
- } else {
- return nil
- }
- }
- if idx > -1 {
- if arr, ok := current.([]interface{}); ok {
- if idx < len(arr) {
- current = arr[idx]
- } else {
- return nil
- }
- } else {
- return nil
- }
- }
- }
- return current
-}
-
-func (hc *HTTPCollector) extractRow(item interface{}, fields []string)
job.ValueRow {
- row := job.ValueRow{Columns: make([]string, len(fields))}
- m, isMap := item.(map[string]interface{})
- for i, field := range fields {
- if strings.EqualFold(field, constants.RESPONSE_TIME) ||
strings.EqualFold(field, constants.StatusCode) {
- row.Columns[i] = constants.NULL_VALUE
- continue
- }
- var val interface{}
- if isMap {
- if v, ok := m[field]; ok {
- val = v
- } else {
- val = hc.navigateJson(item, field)
- }
- }
- if val != nil {
- row.Columns[i] = fmt.Sprintf("%v", val)
- } else {
- row.Columns[i] = constants.NULL_VALUE
- }
- }
- return row
-}
-
-func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData,
fields []string, responseTime int64, statusCode int) {
- if resp == nil {
- return
- }
- if len(resp.Fields) == 0 {
- resp.Fields = make([]job.Field, len(fields))
- for i, f := range fields {
- resp.Fields[i] = job.Field{Field: f, Type: constants.TYPE_STRING}
- }
- }
- if len(resp.Values) == 0 {
- row := job.ValueRow{Columns: make([]string, len(fields))}
- for k := range row.Columns {
- row.Columns[k] = constants.NULL_VALUE
- }
- resp.Values = append(resp.Values, row)
- }
- for i := range resp.Values {
- for j, field := range fields {
- if strings.EqualFold(field, constants.RESPONSE_TIME) {
- resp.Values[i].Columns[j] = strconv.FormatInt(responseTime, 10)
- }
- if strings.EqualFold(field, constants.StatusCode) {
- resp.Values[i].Columns[j] = strconv.Itoa(statusCode)
- }
- }
- }
-}
-
-func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration {
- if timeoutStr == "" {
- return 10 * time.Second
- }
- if val, err := strconv.Atoi(timeoutStr); err == nil {
- return time.Duration(val) * time.Millisecond
- }
- return 10 * time.Second
-}
-
-func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics)
*job.CollectRepMetricsData {
- return &job.CollectRepMetricsData{
- Metrics: metrics.Name,
- Time: time.Now().UnixMilli(),
- Code: constants.CollectSuccess,
- Msg: "success",
- Values: make([]job.ValueRow, 0),
- }
-}
-
-func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int,
msg string) *job.CollectRepMetricsData {
- return &job.CollectRepMetricsData{
- Metrics: metrics.Name,
- Time: time.Now().UnixMilli(),
- Code: code,
- Msg: msg,
- }
-}
diff --git a/internal/collector/common/transport/transport.go
b/internal/collector/common/transport/transport.go
deleted file mode 100644
index f2edf73..0000000
--- a/internal/collector/common/transport/transport.go
+++ /dev/null
@@ -1,277 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package transport
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "os"
-
- pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
- clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
- configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-const (
- // DefaultManagerAddr is the default manager server address (Java Netty
default port)
- DefaultManagerAddr = "127.0.0.1:1158"
- // DefaultProtocol is the default communication protocol for Java
compatibility
- DefaultProtocol = "netty"
- // DefaultMode is the default operation mode
- DefaultMode = "public"
- // DefaultIdentity is the default collector identity
- DefaultIdentity = "collector-go"
-)
-
-type Runner struct {
- Config *configtypes.CollectorConfig
- client transport.TransportClient
- jobScheduler transport.JobScheduler
- clrserver.Server
-}
-
-func New(cfg *configtypes.CollectorConfig) *Runner {
- return &Runner{
- Config: cfg,
- Server: clrserver.Server{
- Logger: logger.Logger{}, // Will be initialized properly in Start method
- },
- }
-}
-
-// SetJobScheduler sets the job scheduler for the transport runner
-func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) {
- r.jobScheduler = scheduler
-}
-
-// NewFromConfig creates a new transport runner from collector configuration
-func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
- if cfg == nil {
- return nil
- }
- return New(cfg)
-}
-
-// NewFromEnv creates a new transport runner from environment variables
-func NewFromEnv() *Runner {
- envLoader := config.NewEnvConfigLoader()
- cfg := envLoader.LoadFromEnv()
- return NewFromConfig(cfg)
-}
-
-// NewFromUnifiedConfig creates a new transport runner using unified
configuration loading
-// It loads from file first, then overrides with environment variables
-func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
- unifiedLoader := config.NewUnifiedConfigLoader(cfgPath)
- cfg, err := unifiedLoader.Load()
- if err != nil {
- return nil, err
- }
- return NewFromConfig(cfg), nil
-}
-
-func (r *Runner) Start(ctx context.Context) error {
- // 初始化 Logger 如果它还没有被设置
- if r.Logger.IsZero() {
- r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
- }
- r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner",
r.Info().Name)
- r.Logger.Info("Starting transport client")
-
- // 构建 server 地址
- addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host,
r.Config.Collector.Manager.Port)
- if addr == ":" {
- // 如果配置为空,使用环境变量或默认值
- if v := os.Getenv("MANAGER_ADDR"); v != "" {
- addr = v
- } else {
- addr = DefaultManagerAddr
- }
- }
-
- // 确定协议
- protocol := r.Config.Collector.Manager.Protocol
- if protocol == "" {
- if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
- protocol = v
- } else {
- protocol = DefaultProtocol
- }
- }
-
- r.Logger.Info("Connecting to manager server", "addr", addr, "protocol",
protocol)
-
- // 创建客户端
- factory := &transport.TransportClientFactory{}
- client, err := factory.CreateClient(protocol, addr)
- if err != nil {
- r.Logger.Error(err, "Failed to create transport client")
- return err
- }
-
- // Set the identity on the client if it supports it
- identity := r.Config.Collector.Identity
- if identity == "" {
- identity = DefaultIdentity
- }
-
- if nettyClient, ok := client.(*transport.NettyClient); ok {
- nettyClient.SetIdentity(identity)
- }
-
- r.client = client
-
- // 设置事件处理器
- switch c := client.(type) {
- case *transport.GrpcClient:
- c.SetEventHandler(func(event transport.Event) {
- switch event.Type {
- case transport.EventConnected:
- r.Logger.Info("Connected to manager gRPC server", "addr",
event.Address)
- go r.sendOnlineMessage()
- case transport.EventDisconnected:
- r.Logger.Info("Disconnected from manager gRPC server", "addr",
event.Address)
- case transport.EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to connect to manager gRPC
server", "addr", event.Address)
- }
- })
- // Register processors with job scheduler
- if r.jobScheduler != nil {
- transport.RegisterDefaultProcessors(c, r.jobScheduler)
- r.Logger.Info("Registered gRPC processors with job scheduler")
- } else {
- transport.RegisterDefaultProcessors(c, nil)
- r.Logger.Info("Registered gRPC processors without job scheduler")
- }
- case *transport.NettyClient:
- c.SetEventHandler(func(event transport.Event) {
- switch event.Type {
- case transport.EventConnected:
- r.Logger.Info("Connected to manager netty server", "addr",
event.Address)
- go r.sendOnlineMessage()
- case transport.EventDisconnected:
- r.Logger.Info("Disconnected from manager netty server", "addr",
event.Address)
- case transport.EventConnectFailed:
- r.Logger.Error(event.Error, "Failed to connect to manager netty
server", "addr", event.Address)
- }
- })
- // Register processors with job scheduler
- if r.jobScheduler != nil {
- transport.RegisterDefaultNettyProcessors(c, r.jobScheduler)
- r.Logger.Info("Registered netty processors with job scheduler")
- } else {
- transport.RegisterDefaultNettyProcessors(c, nil)
- r.Logger.Info("Registered netty processors without job scheduler")
- }
- }
-
- if err := r.client.Start(); err != nil {
- r.Logger.Error(err, "Failed to start transport client")
- return err
- }
-
- // 创建新的context用于监控关闭信号
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
-
- // 监听 ctx.Done 优雅关闭
- go func() {
- <-ctx.Done()
- r.Logger.Info("Shutting down transport client...")
- _ = r.client.Shutdown()
- }()
-
- // 阻塞直到 ctx.Done
- <-ctx.Done()
- return nil
-}
-
-func (r *Runner) sendOnlineMessage() {
- if r.client != nil && r.client.IsStarted() {
- // Use the configured identity
- identity := r.Config.Collector.Identity
- if identity == "" {
- identity = DefaultIdentity
- }
-
- // Create CollectorInfo JSON structure as expected by Java server
- mode := r.Config.Collector.Mode
- if mode == "" {
- mode = DefaultMode // Default mode as in Java version
- }
-
- collectorInfo := map[string]interface{}{
- "name": identity,
- "ip": "", // Let server detect IP
- "version": "1.0.0",
- "mode": mode,
- }
-
- // Convert to JSON bytes
- jsonData, err := json.Marshal(collectorInfo)
- if err != nil {
- r.Logger.Error(err, "Failed to marshal collector info to JSON")
- return
- }
-
- onlineMsg := &pb.Message{
- Type: pb.MessageType_GO_ONLINE,
- Direction: pb.Direction_REQUEST,
- Identity: identity,
- Msg: jsonData,
- }
-
- r.Logger.Info("Sending online message", "identity", identity, "type",
onlineMsg.Type)
-
- if err := r.client.SendMsg(onlineMsg); err != nil {
- r.Logger.Error(err, "Failed to send online message", "identity",
identity)
- } else {
- r.Logger.Info("Online message sent successfully", "identity", identity)
- }
- }
-}
-
-func (r *Runner) Info() collector.Info {
- return collector.Info{
- Name: "transport",
- }
-}
-
-func (r *Runner) Close() error {
- r.Logger.Info("transport close...")
- if r.client != nil {
- _ = r.client.Shutdown()
- }
- return nil
-}
-
-// GetClient returns the transport client (for testing and advanced usage)
-func (r *Runner) GetClient() transport.TransportClient {
- return r.client
-}
-
-// IsConnected returns whether the client is connected and started
-func (r *Runner) IsConnected() bool {
- return r.client != nil && r.client.IsStarted()
-}
diff --git a/internal/collector/config/config.go b/internal/config/config.go
similarity index 95%
rename from internal/collector/config/config.go
rename to internal/config/config.go
index a5bafdb..a5a299f 100644
--- a/internal/collector/config/config.go
+++ b/internal/config/config.go
@@ -22,10 +22,10 @@ import (
"os"
"gopkg.in/yaml.v3"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+ collectortypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- collectortypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/config/config_factory.go
b/internal/config/config_factory.go
similarity index 98%
rename from internal/collector/config/config_factory.go
rename to internal/config/config_factory.go
index 47b82b5..7854d4f 100644
--- a/internal/collector/config/config_factory.go
+++ b/internal/config/config_factory.go
@@ -25,8 +25,8 @@ import (
"strconv"
"strings"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/config/config_test.go
b/internal/config/config_test.go
similarity index 100%
rename from internal/collector/config/config_test.go
rename to internal/config/config_test.go
diff --git a/internal/collector/config/env_config.go
b/internal/config/env_config.go
similarity index 96%
rename from internal/collector/config/env_config.go
rename to internal/config/env_config.go
index a81acf3..40c09c1 100644
--- a/internal/collector/config/env_config.go
+++ b/internal/config/env_config.go
@@ -20,8 +20,8 @@ package config
import (
"os"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/config/unified_config.go
b/internal/config/unified_config.go
similarity index 97%
rename from internal/collector/config/unified_config.go
rename to internal/config/unified_config.go
index bd0fde5..2640448 100644
--- a/internal/collector/config/unified_config.go
+++ b/internal/config/unified_config.go
@@ -20,8 +20,8 @@ package config
import (
"os"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/basic/dns/.keep b/internal/job/cache/.keep
similarity index 100%
rename from internal/collector/basic/dns/.keep
rename to internal/job/cache/.keep
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go
b/internal/job/collect/dispatch/metrics_collector.go
similarity index 98%
rename from internal/collector/common/collect/dispatch/metrics_collector.go
rename to internal/job/collect/dispatch/metrics_collector.go
index 7261ebe..4074275 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/job/collect/dispatch/metrics_collector.go
@@ -29,9 +29,9 @@ import (
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/vm"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/unit"
@@ -503,7 +503,7 @@ func formatNumber(value float64) string {
// CollectMetrics collects metrics using the appropriate collector and returns
results via channel
// This method uses Go's channel-based concurrency instead of Java's thread
pools
-func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job
*jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData {
+func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job
*jobtypes.Job, _ *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData {
resultChan := make(chan *jobtypes.CollectRepMetricsData, 1)
// Start collection in a goroutine (Go's lightweight thread)
diff --git
a/internal/collector/common/collect/dispatch/metrics_collector_test.go
b/internal/job/collect/dispatch/metrics_collector_test.go
similarity index 99%
rename from internal/collector/common/collect/dispatch/metrics_collector_test.go
rename to internal/job/collect/dispatch/metrics_collector_test.go
index bae124d..581de71 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector_test.go
+++ b/internal/job/collect/dispatch/metrics_collector_test.go
@@ -22,9 +22,9 @@ package dispatch
import (
"testing"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/common/collect/lazy_message_router.go
b/internal/job/collect/lazy_message_router.go
similarity index 99%
rename from internal/collector/common/collect/lazy_message_router.go
rename to internal/job/collect/lazy_message_router.go
index a038099..9613fb0 100644
--- a/internal/collector/common/collect/lazy_message_router.go
+++ b/internal/job/collect/lazy_message_router.go
@@ -30,8 +30,8 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git
a/internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go
b/internal/job/collect/metrics/hertzbeat_metrics_collector.go
similarity index 100%
rename from
internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go
rename to internal/job/collect/metrics/hertzbeat_metrics_collector.go
diff --git a/internal/collector/common/collect/result_handler.go
b/internal/job/collect/result_handler.go
similarity index 90%
rename from internal/collector/common/collect/result_handler.go
rename to internal/job/collect/result_handler.go
index cd16820..1592e6c 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/job/collect/result_handler.go
@@ -22,8 +22,8 @@ package collect
import (
"fmt"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -35,7 +35,7 @@ type ResultHandlerImpl struct {
// ResultHandler interface for handling collection results
type ResultHandler interface {
- HandleCollectData(data *jobtypes.CollectRepMetricsData, job
*jobtypes.Job) error
+ HandleCollectData(data *job.CollectRepMetricsData, job *job.Job) error
}
// NewResultHandler creates a new result handler
@@ -47,7 +47,7 @@ func NewResultHandler(logger logger.Logger, messageRouter
MessageRouter) ResultH
}
// HandleCollectData processes the collection results
-func (rh *ResultHandlerImpl) HandleCollectData(data
*jobtypes.CollectRepMetricsData, job *jobtypes.Job) error {
+func (rh *ResultHandlerImpl) HandleCollectData(data
*job.CollectRepMetricsData, job *job.Job) error {
if data == nil {
rh.logger.Error(nil, "collect data is nil")
return fmt.Errorf("collect data is nil")
diff --git a/internal/collector/common/collect/strategy/strategy_factory.go
b/internal/job/collect/strategy/strategy_factory.go
similarity index 99%
rename from internal/collector/common/collect/strategy/strategy_factory.go
rename to internal/job/collect/strategy/strategy_factory.go
index 9fc9af1..a118aa4 100644
--- a/internal/collector/common/collect/strategy/strategy_factory.go
+++ b/internal/job/collect/strategy/strategy_factory.go
@@ -21,7 +21,7 @@ import (
"fmt"
"sync"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go
b/internal/job/dispatcher/common_dispatcher.go
similarity index 98%
rename from internal/collector/common/dispatcher/common_dispatcher.go
rename to internal/job/dispatcher/common_dispatcher.go
index 288e0c1..eb6afcc 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/job/dispatcher/common_dispatcher.go
@@ -25,8 +25,8 @@ import (
"sync"
"time"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
)
diff --git a/internal/collector/basic/ftp/.keep
b/internal/job/dispatcher/entrance/.keep
similarity index 100%
rename from internal/collector/basic/ftp/.keep
rename to internal/job/dispatcher/entrance/.keep
diff --git a/internal/collector/basic/icmp/.keep
b/internal/job/dispatcher/exporter/.keep
similarity index 100%
rename from internal/collector/basic/icmp/.keep
rename to internal/job/dispatcher/exporter/.keep
diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go
b/internal/job/dispatcher/hashed_wheel_timer.go
similarity index 99%
rename from internal/collector/common/dispatcher/hashed_wheel_timer.go
rename to internal/job/dispatcher/hashed_wheel_timer.go
index 9100205..c65047d 100644
--- a/internal/collector/common/dispatcher/hashed_wheel_timer.go
+++ b/internal/job/dispatcher/hashed_wheel_timer.go
@@ -25,7 +25,7 @@ import (
"sync/atomic"
"time"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/common/dispatcher/time_wheel.go
b/internal/job/dispatcher/time_wheel.go
similarity index 95%
rename from internal/collector/common/dispatcher/time_wheel.go
rename to internal/job/dispatcher/time_wheel.go
index d8c7aa1..a93df77 100644
--- a/internal/collector/common/dispatcher/time_wheel.go
+++ b/internal/job/dispatcher/time_wheel.go
@@ -25,14 +25,14 @@ import (
"sync"
"time"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
// JobInfo holds information about a scheduled job
type JobInfo struct {
- Job *jobtypes.Job
- Timeout *jobtypes.Timeout
+ Job *job.Job
+ Timeout *job.Timeout
CreatedAt time.Time
LastExecutedAt time.Time
NextExecutionAt time.Time
@@ -70,12 +70,12 @@ type DispatcherStats struct {
// MetricsTaskDispatcher interface for metrics task dispatching
type MetricsTaskDispatcher interface {
- DispatchMetricsTask(ctx context.Context, job *jobtypes.Job, timeout
*jobtypes.Timeout) error
+ DispatchMetricsTask(ctx context.Context, job *job.Job, timeout
*job.Timeout) error
}
// HashedWheelTimer interface for time wheel operations
type HashedWheelTimer interface {
- NewTimeout(task jobtypes.TimerTask, delay time.Duration)
*jobtypes.Timeout
+ NewTimeout(task job.TimerTask, delay time.Duration) *job.Timeout
Start(ctx context.Context) error
Stop() error
GetStats() map[string]interface{}
@@ -101,7 +101,7 @@ func NewTimeDispatch(logger logger.Logger, commonDispatcher
MetricsTaskDispatche
// AddJob adds a job to the time-based scheduler
// This corresponds to Java's TimerDispatcher.addJob method
-func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
+func (td *TimeDispatch) AddJob(job *job.Job) error {
if job == nil {
return fmt.Errorf("job cannot be nil")
}
@@ -172,7 +172,7 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error {
// Try to remove from cyclic tasks
if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID);
exists {
- if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+ if timeout, ok := timeoutInterface.(*job.Timeout); ok {
timeout.Cancel()
td.jobInfos.Delete(jobID)
td.logger.V(1).Info("removed cyclic job", "jobID",
jobID)
@@ -182,7 +182,7 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error {
// Try to remove from temp tasks
if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID);
exists {
- if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+ if timeout, ok := timeoutInterface.(*job.Timeout); ok {
timeout.Cancel()
td.jobInfos.Delete(jobID)
td.logger.V(1).Info("removed one-time job", "jobID",
jobID)
@@ -233,14 +233,14 @@ func (td *TimeDispatch) Stop() error {
// Cancel all running tasks
td.cyclicTasks.Range(func(key, value interface{}) bool {
- if timeout, ok := value.(*jobtypes.Timeout); ok {
+ if timeout, ok := value.(*job.Timeout); ok {
timeout.Cancel()
}
return true
})
td.tempTasks.Range(func(key, value interface{}) bool {
- if timeout, ok := value.(*jobtypes.Timeout); ok {
+ if timeout, ok := value.(*job.Timeout); ok {
timeout.Cancel()
}
return true
@@ -339,7 +339,7 @@ func (td *TimeDispatch) UpdateJobExecution(jobID int64) {
}
// RescheduleJob reschedules a cyclic job for the next execution
-func (td *TimeDispatch) RescheduleJob(job *jobtypes.Job) error {
+func (td *TimeDispatch) RescheduleJob(job *job.Job) error {
if !job.IsCyclic || job.DefaultInterval <= 0 {
return fmt.Errorf("job is not cyclable or has invalid interval")
}
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go
b/internal/job/dispatcher/wheel_timer_task.go
similarity index 87%
rename from internal/collector/common/dispatcher/wheel_timer_task.go
rename to internal/job/dispatcher/wheel_timer_task.go
index 657103a..09bfb18 100644
--- a/internal/collector/common/dispatcher/wheel_timer_task.go
+++ b/internal/job/dispatcher/wheel_timer_task.go
@@ -24,7 +24,7 @@ import (
"fmt"
"time"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -34,20 +34,20 @@ type StatsRecorder interface {
RecordJobCompleted()
RecordJobFailed()
UpdateJobExecution(jobID int64)
- RescheduleJob(job *jobtypes.Job) error
+ RescheduleJob(job *job.Job) error
}
// WheelTimerTask represents a task that runs in the time wheel
// This corresponds to Java's WheelTimerTask
type WheelTimerTask struct {
- job *jobtypes.Job
+ job *job.Job
commonDispatcher MetricsTaskDispatcher
statsRecorder StatsRecorder
logger logger.Logger
}
// NewWheelTimerTask creates a new wheel timer task
-func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher
MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger)
*WheelTimerTask {
+func NewWheelTimerTask(job *job.Job, commonDispatcher MetricsTaskDispatcher,
statsRecorder StatsRecorder, logger logger.Logger) *WheelTimerTask {
return &WheelTimerTask{
job: job,
commonDispatcher: commonDispatcher,
@@ -58,7 +58,7 @@ func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher
MetricsTaskDispatcher
// Run executes the timer task
// This corresponds to Java's WheelTimerTask.run method
-func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error {
+func (wtt *WheelTimerTask) Run(timeout *job.Timeout) error {
if wtt.job == nil {
wtt.logger.Error(nil, "job is nil, cannot execute")
return fmt.Errorf("job is nil, cannot execute")
@@ -116,7 +116,7 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout)
error {
}
// rescheduleJob reschedules a cyclic job for the next execution
-func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) {
+func (wtt *WheelTimerTask) rescheduleJob(timeout *job.Timeout) {
if wtt.job.DefaultInterval <= 0 {
wtt.logger.Info("job has no valid interval, not rescheduling",
"jobID", wtt.job.ID)
@@ -134,6 +134,6 @@ func (wtt *WheelTimerTask) rescheduleJob(timeout
*jobtypes.Timeout) {
}
// GetJob returns the associated job
-func (wtt *WheelTimerTask) GetJob() *jobtypes.Job {
+func (wtt *WheelTimerTask) GetJob() *job.Job {
return wtt.job
}
diff --git a/internal/collector/common/router/message_router.go
b/internal/job/router/message_router.go
similarity index 93%
rename from internal/collector/common/router/message_router.go
rename to internal/job/router/message_router.go
index e3093ee..e03d9c2 100644
--- a/internal/collector/common/router/message_router.go
+++ b/internal/job/router/message_router.go
@@ -24,9 +24,9 @@ import (
"fmt"
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+ job2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -50,7 +50,7 @@ type MessageRouter interface {
RegisterProcessors()
// SendResult sends collection results back to the manager
- SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job)
error
+ SendResult(data *job2.CollectRepMetricsData, job *job2.Job) error
// GetIdentity returns the collector identity
GetIdentity() string
@@ -60,7 +60,7 @@ type MessageRouter interface {
type MessageRouterImpl struct {
logger logger.Logger
client transport.TransportClient
- jobRunner *job.Runner
+ jobRunner *server.Runner
identity string
}
@@ -68,7 +68,7 @@ type MessageRouterImpl struct {
type Config struct {
Logger logger.Logger
Client transport.TransportClient
- JobRunner *job.Runner
+ JobRunner *server.Runner
Identity string
}
@@ -122,7 +122,7 @@ func (r *MessageRouterImpl) handleIssueCyclicTask(msg
*pb.Message) (*pb.Message,
r.logger.Info("Handling cyclic task message")
// Parse job from message
- var job jobtypes.Job
+ var job job2.Job
if err := json.Unmarshal(msg.Msg, &job); err != nil {
r.logger.Error(err, "failed to unmarshal job")
return &pb.Message{
@@ -189,7 +189,7 @@ func (r *MessageRouterImpl) handleIssueOneTimeTask(msg
*pb.Message) (*pb.Message
r.logger.Info("Handling one-time task message")
// Parse job from message
- var job jobtypes.Job
+ var job job2.Job
if err := json.Unmarshal(msg.Msg, &job); err != nil {
r.logger.Error(err, "failed to unmarshal job")
return &pb.Message{
@@ -223,7 +223,7 @@ func (r *MessageRouterImpl) handleIssueOneTimeTask(msg
*pb.Message) (*pb.Message
}
// SendResult sends collection results back to the manager
-func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData,
job *jobtypes.Job) error {
+func (r *MessageRouterImpl) SendResult(data *job2.CollectRepMetricsData, job
*job2.Job) error {
if r.client == nil || !r.client.IsStarted() {
return fmt.Errorf("transport client not started")
}
@@ -237,7 +237,7 @@ func (r *MessageRouterImpl) SendResult(data
*jobtypes.CollectRepMetricsData, job
}
// Serialize metrics data
- dataBytes, err := json.Marshal([]jobtypes.CollectRepMetricsData{*data})
+ dataBytes, err := json.Marshal([]job2.CollectRepMetricsData{*data})
if err != nil {
return fmt.Errorf("failed to marshal metrics data: %w", err)
}
diff --git a/internal/collector/common/job/job_server.go
b/internal/job/server/job_server.go
similarity index 92%
rename from internal/collector/common/job/job_server.go
rename to internal/job/server/job_server.go
index 4451ffa..0a47a0e 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/job/server/job_server.go
@@ -17,7 +17,7 @@
* under the License.
*/
-package job
+package server
import (
"context"
@@ -25,13 +25,13 @@ import (
"fmt"
"sync"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
- clrserver
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/dispatch"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/dispatcher"
+ clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
)
// TimeDispatcher interface defines time-based job scheduling
diff --git a/internal/collector/common/types/collector/collector_types.go
b/internal/metrics/metrics.go
similarity index 91%
copy from internal/collector/common/types/collector/collector_types.go
copy to internal/metrics/metrics.go
index 4ab91e4..5e29267 100644
--- a/internal/collector/common/types/collector/collector_types.go
+++ b/internal/metrics/metrics.go
@@ -17,8 +17,4 @@
* under the License.
*/
-package collector
-
-type Info struct {
- Name string `json:"name" yaml:"name"`
-}
+package metrics
diff --git a/internal/collector/basic/database/jdbc_collector.go
b/internal/protocol/basic/database/jdbc_collector.go
similarity index 98%
rename from internal/collector/basic/database/jdbc_collector.go
rename to internal/protocol/basic/database/jdbc_collector.go
index ec8f179..a84c6ac 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/protocol/basic/database/jdbc_collector.go
@@ -38,10 +38,10 @@ import (
_ "github.com/microsoft/go-mssqldb"
_ "github.com/sijms/go-ora/v2"
"golang.org/x/crypto/ssh"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+ protocol2
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
@@ -192,7 +192,7 @@ type sshTunnelHelper struct {
// startSSHTunnel starts an SSH tunnel
// It connects to the SSH bastion, listens on a local random port,
// and forwards traffic to the target database.
-func startSSHTunnel(config *protocol.SSHTunnel, remoteHost, remotePort string,
timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string,
error) {
+func startSSHTunnel(config *protocol2.SSHTunnel, remoteHost, remotePort
string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string,
string, error) {
sshHost := config.Host
sshPort, err := strconv.Atoi(config.Port)
if err != nil {
@@ -322,7 +322,7 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
}
// extractJDBCConfig extracts JDBC configuration from interface{} type
-func extractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCProtocol,
error) {
+func extractJDBCConfig(jdbcInterface interface{}) (*protocol2.JDBCProtocol,
error) {
replacer := param.NewReplacer()
return replacer.ExtractJDBCConfig(jdbcInterface)
}
@@ -404,7 +404,7 @@ func (jc *JDBCCollector) PreCheck(metrics
*jobtypes.Metrics) error {
}
// checkTunnelParam validates SSH tunnel configuration
-func (jc *JDBCCollector) checkTunnelParam(config *protocol.SSHTunnel) error {
+func (jc *JDBCCollector) checkTunnelParam(config *protocol2.SSHTunnel) error {
if config == nil {
return nil
}
@@ -610,7 +610,7 @@ func validateURL(rawURL string, platform string) error {
}
// constructDatabaseURL constructs the database connection URL/DSN
-func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol.JDBCProtocol,
host, port string) (string, error) {
+func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol2.JDBCProtocol,
host, port string) (string, error) {
// 1. If user provided a full URL, use it (already validated in
PreCheck)
if jdbc.URL != "" {
// Validate again to prevent bypassing PreCheck
diff --git a/internal/collector/basic/database/jdbc_collector_test.go
b/internal/protocol/basic/database/jdbc_collector_test.go
similarity index 100%
rename from internal/collector/basic/database/jdbc_collector_test.go
rename to internal/protocol/basic/database/jdbc_collector_test.go
diff --git a/internal/collector/basic/imap/.keep
b/internal/protocol/basic/dns/.keep
similarity index 100%
rename from internal/collector/basic/imap/.keep
rename to internal/protocol/basic/dns/.keep
diff --git a/internal/collector/basic/ipmi2/.keep
b/internal/protocol/basic/ftp/.keep
similarity index 100%
rename from internal/collector/basic/ipmi2/.keep
rename to internal/protocol/basic/ftp/.keep
diff --git a/internal/protocol/basic/http/http_collector.go
b/internal/protocol/basic/http/http_collector.go
new file mode 100644
index 0000000..11a8b9b
--- /dev/null
+++ b/internal/protocol/basic/http/http_collector.go
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package http
+
+import (
+ "bytes"
+ "crypto/md5"
+ "crypto/rand"
+ "crypto/tls"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/prometheus/common/expfmt"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+func init() {
+ strategy.RegisterFactory(ProtocolHTTP, func(logger logger.Logger)
strategy.Collector {
+ return NewHTTPCollector(logger)
+ })
+}
+
+const (
+ ProtocolHTTP = "http"
+
+ // Parse Types
+ ParseTypeDefault = "default"
+ ParseTypeJsonPath = "jsonPath"
+ ParseTypePrometheus = "prometheus"
+ ParseTypeWebsite = "website"
+ ParseTypeHeader = "header"
+
+ // Auth Types
+ AuthTypeBasic = "Basic Auth"
+ AuthTypeBearer = "Bearer Token"
+ AuthTypeDigest = "Digest Auth"
+)
+
+type HTTPCollector struct {
+ logger logger.Logger
+}
+
+func NewHTTPCollector(log logger.Logger) *HTTPCollector {
+ return &HTTPCollector{
+ logger: log.WithName("http-collector"),
+ }
+}
+
+func (hc *HTTPCollector) Protocol() string {
+ return ProtocolHTTP
+}
+
+func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error {
+ if metrics == nil || metrics.HTTP == nil {
+ return fmt.Errorf("http configuration is missing")
+ }
+ return nil
+}
+
+func (hc *HTTPCollector) Collect(metrics *job.Metrics)
*job.CollectRepMetricsData {
+ start := time.Now()
+
+ // MetricsCollector has already performed parameter replacement on
metrics.HTTP
+ httpConfig := metrics.HTTP
+
+ // 1. Prepare URL
+ targetURL := httpConfig.URL
+
+ // Parse SSL bool string
+ isSSL := false
+ if httpConfig.SSL != "" {
+ if val, err := strconv.ParseBool(httpConfig.SSL); err == nil {
+ isSSL = val
+ }
+ }
+
+ if targetURL == "" {
+ schema := "http"
+ if isSSL {
+ schema = "https"
+ }
+ // Use metrics.ConfigMap values if URL is empty
+ // Note: metrics.ConfigMap is already populated with
job.Configmap values
+ host := metrics.ConfigMap["host"]
+ port := metrics.ConfigMap["port"]
+ if host != "" && port != "" {
+ targetURL = fmt.Sprintf("%s://%s:%s", schema, host,
port)
+ }
+ }
+
+ if targetURL != "" && !strings.HasPrefix(targetURL, "http") {
+ schema := "http"
+ if isSSL {
+ schema = "https"
+ }
+ targetURL = fmt.Sprintf("%s://%s", schema, targetURL)
+ }
+
+ if targetURL == "" {
+ return hc.createFailResponse(metrics, constants.CollectFail,
"target URL is empty")
+ }
+
+ // 2. Create Request
+ req, err := hc.createRequest(httpConfig, targetURL)
+ if err != nil {
+ return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("failed to create request: %v", err))
+ }
+
+ // 3. Create Client
+ timeoutStr := metrics.Timeout
+ if httpConfig.Timeout != "" {
+ timeoutStr = httpConfig.Timeout
+ }
+
+ timeout := hc.getTimeout(timeoutStr)
+ client := &http.Client{
+ Timeout: timeout,
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+ },
+ }
+
+ // 4. Execute Request (with Digest retry logic)
+ resp, err := client.Do(req)
+
+ // Handle Digest Auth Challenge (401)
+ if err == nil && resp.StatusCode == http.StatusUnauthorized &&
+ httpConfig.Authorization != nil &&
+ strings.EqualFold(httpConfig.Authorization.Type,
AuthTypeDigest) {
+
+ // Close the first response body
+ io.Copy(io.Discard, resp.Body)
+ resp.Body.Close()
+
+ authHeader := resp.Header.Get("WWW-Authenticate")
+ if authHeader != "" {
+ // Create new request with Authorization header
+ digestReq, digestErr := hc.handleDigestAuth(req,
authHeader, httpConfig.Authorization)
+ if digestErr == nil {
+ // Retry request
+ resp, err = client.Do(digestReq)
+ } else {
+ hc.logger.Error(digestErr, "failed to handle
digest auth")
+ }
+ }
+ }
+
+ if err != nil {
+ return hc.createFailResponse(metrics,
constants.CollectUnReachable, fmt.Sprintf("request failed: %v", err))
+ }
+ defer resp.Body.Close()
+
+ responseTime := time.Since(start).Milliseconds()
+
+ // 5. Parse Response
+ parseType := httpConfig.ParseType
+ if parseType == "" {
+ parseType = ParseTypeDefault
+ }
+
+ var responseData *job.CollectRepMetricsData
+
+ // Read body
+ bodyBytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("failed to read body: %v", err))
+ }
+
+ switch parseType {
+ case ParseTypePrometheus:
+ responseData, err = hc.parsePrometheus(bodyBytes, metrics)
+ case ParseTypeWebsite:
+ responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode,
responseTime, metrics, httpConfig)
+ case ParseTypeHeader:
+ responseData, err = hc.parseHeader(resp.Header, metrics)
+ case ParseTypeJsonPath, ParseTypeDefault:
+ responseData, err = hc.parseJsonPath(bodyBytes, metrics,
responseTime, httpConfig)
+ default:
+ responseData, err = hc.parseJsonPath(bodyBytes, metrics,
responseTime, httpConfig)
+ }
+
+ if err != nil {
+ hc.logger.Error(err, "parse response failed", "type", parseType)
+ return hc.createFailResponse(metrics, constants.CollectFail,
fmt.Sprintf("parse error: %v", err))
+ }
+
+ hc.fillCommonFields(responseData, metrics.AliasFields, responseTime,
resp.StatusCode)
+
+ return responseData
+}
+
+// handleDigestAuth generates a new request with the Digest Authorization
header
+func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request,
authHeader string, authConfig *protocol.Authorization) (*http.Request, error) {
+ params := hc.parseAuthHeader(authHeader)
+ realm := params["realm"]
+ nonce := params["nonce"]
+ qop := params["qop"]
+ opaque := params["opaque"]
+ algorithm := params["algorithm"]
+ if algorithm == "" {
+ algorithm = "MD5"
+ }
+
+ if realm == "" || nonce == "" {
+ return nil, fmt.Errorf("missing realm or nonce in digest
header")
+ }
+
+ username := authConfig.DigestAuthUsername
+ password := authConfig.DigestAuthPassword
+ ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password))
+
+ method := originalReq.Method
+ uri := originalReq.URL.RequestURI()
+ ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri))
+
+ nc := "00000001"
+ cnonce := hc.generateCnonce()
+
+ var responseStr string
+ if qop == "" {
+ responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce,
ha2))
+ } else if strings.Contains(qop, "auth") {
+ responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1,
nonce, nc, cnonce, "auth", ha2))
+ } else {
+ return nil, fmt.Errorf("unsupported qop: %s", qop)
+ }
+
+ authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s",
uri="%s", response="%s", algorithm="%s"`,
+ username, realm, nonce, uri, responseStr, algorithm)
+
+ if opaque != "" {
+ authVal += fmt.Sprintf(`, opaque="%s"`, opaque)
+ }
+ if qop != "" {
+ authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc,
cnonce)
+ }
+
+ var newReq *http.Request
+ if originalReq.GetBody != nil {
+ body, _ := originalReq.GetBody()
+ newReq, _ = http.NewRequest(originalReq.Method,
originalReq.URL.String(), body)
+ } else {
+ newReq, _ = http.NewRequest(originalReq.Method,
originalReq.URL.String(), nil)
+ }
+
+ for k, v := range originalReq.Header {
+ newReq.Header[k] = v
+ }
+
+ newReq.Header.Set("Authorization", authVal)
+ return newReq, nil
+}
+
+func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string {
+ header = strings.TrimPrefix(header, "Digest ")
+ result := make(map[string]string)
+
+ pairs := strings.Split(header, ",")
+ for _, pair := range pairs {
+ parts := strings.SplitN(strings.TrimSpace(pair), "=", 2)
+ if len(parts) == 2 {
+ key := strings.TrimSpace(parts[0])
+ val := strings.TrimSpace(parts[1])
+ val = strings.Trim(val, "\"")
+ result[key] = val
+ }
+ }
+ return result
+}
+
+func (hc *HTTPCollector) md5Hex(s string) string {
+ hash := md5.Sum([]byte(s))
+ return hex.EncodeToString(hash[:])
+}
+
+func (hc *HTTPCollector) generateCnonce() string {
+ b := make([]byte, 8)
+ io.ReadFull(rand.Reader, b)
+ return hex.EncodeToString(b)
+}
+
+func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol,
targetURL string) (*http.Request, error) {
+ method := strings.ToUpper(config.Method)
+ if method == "" {
+ method = "GET"
+ }
+
+ var body io.Reader
+ if config.Body != "" {
+ body = strings.NewReader(config.Body)
+ }
+
+ req, err := http.NewRequest(method, targetURL, body)
+ if err != nil {
+ return nil, err
+ }
+
+ for k, v := range config.Headers {
+ req.Header.Set(k, v)
+ }
+
+ if config.Authorization != nil {
+ auth := config.Authorization
+ if auth.Type == "" || strings.EqualFold(auth.Type,
AuthTypeBasic) {
+ if auth.BasicAuthUsername != "" &&
auth.BasicAuthPassword != "" {
+ req.SetBasicAuth(auth.BasicAuthUsername,
auth.BasicAuthPassword)
+ }
+ } else if strings.EqualFold(auth.Type, AuthTypeBearer) {
+ if auth.BearerTokenToken != "" {
+ req.Header.Set("Authorization", "Bearer
"+auth.BearerTokenToken)
+ }
+ }
+ }
+
+ if len(config.Params) > 0 {
+ q := req.URL.Query()
+ for k, v := range config.Params {
+ q.Add(k, v)
+ }
+ req.URL.RawQuery = q.Encode()
+ }
+
+ return req, nil
+}
+
+// --- Parsers ---
+
+func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics)
(*job.CollectRepMetricsData, error) {
+ response := hc.createSuccessResponse(metrics)
+ var parser expfmt.TextParser
+ metricFamilies, err :=
parser.TextToMetricFamilies(bytes.NewReader(body))
+ if err != nil {
+ return nil, err
+ }
+ now := time.Now().UnixMilli()
+ for _, mf := range metricFamilies {
+ for _, m := range mf.Metric {
+ row := job.ValueRow{Columns: make([]string,
len(metrics.AliasFields))}
+ labels := make(map[string]string)
+ for _, pair := range m.Label {
+ labels[pair.GetName()] = pair.GetValue()
+ }
+ hasValue := false
+ for i, field := range metrics.AliasFields {
+ if field == constants.NULL_VALUE {
+ continue
+ }
+ if field == "value" || field == "prom_value" {
+ if m.Gauge != nil {
+ row.Columns[i] =
fmt.Sprintf("%f", m.Gauge.GetValue())
+ } else if m.Counter != nil {
+ row.Columns[i] =
fmt.Sprintf("%f", m.Counter.GetValue())
+ } else if m.Untyped != nil {
+ row.Columns[i] =
fmt.Sprintf("%f", m.Untyped.GetValue())
+ }
+ hasValue = true
+ } else {
+ if val, ok := labels[field]; ok {
+ row.Columns[i] = val
+ hasValue = true
+ } else {
+ row.Columns[i] =
constants.NULL_VALUE
+ }
+ }
+ }
+ if hasValue {
+ response.Values = append(response.Values, row)
+ }
+ }
+ }
+ response.Time = now
+ return response, nil
+}
+
+func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int,
responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol)
(*job.CollectRepMetricsData, error) {
+ response := hc.createSuccessResponse(metrics)
+ row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
+ keyword := httpConfig.Keyword
+ keywordCount := 0
+ if keyword != "" {
+ keywordCount = strings.Count(string(body), keyword)
+ }
+ for i, field := range metrics.AliasFields {
+ switch strings.ToLower(field) {
+ case strings.ToLower(constants.StatusCode):
+ row.Columns[i] = strconv.Itoa(statusCode)
+ case strings.ToLower(constants.RESPONSE_TIME):
+ row.Columns[i] = strconv.FormatInt(responseTime, 10)
+ case "keyword":
+ row.Columns[i] = strconv.Itoa(keywordCount)
+ default:
+ row.Columns[i] = constants.NULL_VALUE
+ }
+ }
+ response.Values = append(response.Values, row)
+ return response, nil
+}
+
+func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics)
(*job.CollectRepMetricsData, error) {
+ response := hc.createSuccessResponse(metrics)
+ row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
+ for i, field := range metrics.AliasFields {
+ val := header.Get(field)
+ if val != "" {
+ row.Columns[i] = val
+ } else {
+ row.Columns[i] = constants.NULL_VALUE
+ }
+ }
+ response.Values = append(response.Values, row)
+ return response, nil
+}
+
+func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics,
responseTime int64, httpConfig *protocol.HTTPProtocol)
(*job.CollectRepMetricsData, error) {
+ response := hc.createSuccessResponse(metrics)
+ var data interface{}
+ decoder := json.NewDecoder(bytes.NewReader(body))
+ decoder.UseNumber()
+ if err := decoder.Decode(&data); err != nil {
+ return response, nil
+ }
+ parseScript := httpConfig.ParseScript
+ root := data
+ if parseScript != "" && parseScript != "$" {
+ root = hc.navigateJson(data, parseScript)
+ }
+ if root == nil {
+ return response, nil
+ }
+ if arr, ok := root.([]interface{}); ok {
+ for _, item := range arr {
+ row := hc.extractRow(item, metrics.AliasFields)
+ response.Values = append(response.Values, row)
+ }
+ } else {
+ row := hc.extractRow(root, metrics.AliasFields)
+ response.Values = append(response.Values, row)
+ }
+ return response, nil
+}
+
+func (hc *HTTPCollector) navigateJson(data interface{}, path string)
interface{} {
+ path = strings.TrimPrefix(path, "$.")
+ if path == "" {
+ return data
+ }
+ parts := strings.Split(path, ".")
+ current := data
+ for _, part := range parts {
+ if current == nil {
+ return nil
+ }
+ key := part
+ idx := -1
+ if i := strings.Index(part, "["); i > -1 &&
strings.HasSuffix(part, "]") {
+ key = part[:i]
+ idxStr := part[i+1 : len(part)-1]
+ if val, err := strconv.Atoi(idxStr); err == nil {
+ idx = val
+ }
+ }
+ if key != "" {
+ if m, ok := current.(map[string]interface{}); ok {
+ if val, exists := m[key]; exists {
+ current = val
+ } else {
+ return nil
+ }
+ } else {
+ return nil
+ }
+ }
+ if idx > -1 {
+ if arr, ok := current.([]interface{}); ok {
+ if idx < len(arr) {
+ current = arr[idx]
+ } else {
+ return nil
+ }
+ } else {
+ return nil
+ }
+ }
+ }
+ return current
+}
+
+func (hc *HTTPCollector) extractRow(item interface{}, fields []string)
job.ValueRow {
+ row := job.ValueRow{Columns: make([]string, len(fields))}
+ m, isMap := item.(map[string]interface{})
+ for i, field := range fields {
+ if strings.EqualFold(field, constants.RESPONSE_TIME) ||
strings.EqualFold(field, constants.StatusCode) {
+ row.Columns[i] = constants.NULL_VALUE
+ continue
+ }
+ var val interface{}
+ if isMap {
+ if v, ok := m[field]; ok {
+ val = v
+ } else {
+ val = hc.navigateJson(item, field)
+ }
+ }
+ if val != nil {
+ row.Columns[i] = fmt.Sprintf("%v", val)
+ } else {
+ row.Columns[i] = constants.NULL_VALUE
+ }
+ }
+ return row
+}
+
+func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData,
fields []string, responseTime int64, statusCode int) {
+ if resp == nil {
+ return
+ }
+ if len(resp.Fields) == 0 {
+ resp.Fields = make([]job.Field, len(fields))
+ for i, f := range fields {
+ resp.Fields[i] = job.Field{Field: f, Type:
constants.TYPE_STRING}
+ }
+ }
+ if len(resp.Values) == 0 {
+ row := job.ValueRow{Columns: make([]string, len(fields))}
+ for k := range row.Columns {
+ row.Columns[k] = constants.NULL_VALUE
+ }
+ resp.Values = append(resp.Values, row)
+ }
+ for i := range resp.Values {
+ for j, field := range fields {
+ if strings.EqualFold(field, constants.RESPONSE_TIME) {
+ resp.Values[i].Columns[j] =
strconv.FormatInt(responseTime, 10)
+ }
+ if strings.EqualFold(field, constants.StatusCode) {
+ resp.Values[i].Columns[j] =
strconv.Itoa(statusCode)
+ }
+ }
+ }
+}
+
+func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration {
+ if timeoutStr == "" {
+ return 10 * time.Second
+ }
+ if val, err := strconv.Atoi(timeoutStr); err == nil {
+ return time.Duration(val) * time.Millisecond
+ }
+ return 10 * time.Second
+}
+
+func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics)
*job.CollectRepMetricsData {
+ return &job.CollectRepMetricsData{
+ Metrics: metrics.Name,
+ Time: time.Now().UnixMilli(),
+ Code: constants.CollectSuccess,
+ Msg: "success",
+ Values: make([]job.ValueRow, 0),
+ }
+}
+
+func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int,
msg string) *job.CollectRepMetricsData {
+ return &job.CollectRepMetricsData{
+ Metrics: metrics.Name,
+ Time: time.Now().UnixMilli(),
+ Code: code,
+ Msg: msg,
+ }
+}
diff --git a/internal/collector/basic/memcached/.keep
b/internal/protocol/basic/icmp/.keep
similarity index 100%
rename from internal/collector/basic/memcached/.keep
rename to internal/protocol/basic/icmp/.keep
diff --git a/internal/collector/basic/modbus/.keep
b/internal/protocol/basic/imap/.keep
similarity index 100%
rename from internal/collector/basic/modbus/.keep
rename to internal/protocol/basic/imap/.keep
diff --git a/internal/collector/basic/mqtt/.keep
b/internal/protocol/basic/ipmi2/.keep
similarity index 100%
rename from internal/collector/basic/mqtt/.keep
rename to internal/protocol/basic/ipmi2/.keep
diff --git a/internal/collector/basic/nginx/.keep
b/internal/protocol/basic/memcached/.keep
similarity index 100%
rename from internal/collector/basic/nginx/.keep
rename to internal/protocol/basic/memcached/.keep
diff --git a/internal/collector/basic/ntp/.keep
b/internal/protocol/basic/modbus/.keep
similarity index 100%
rename from internal/collector/basic/ntp/.keep
rename to internal/protocol/basic/modbus/.keep
diff --git a/internal/collector/basic/plc/.keep
b/internal/protocol/basic/mqtt/.keep
similarity index 100%
rename from internal/collector/basic/plc/.keep
rename to internal/protocol/basic/mqtt/.keep
diff --git a/internal/collector/basic/pop3/.keep
b/internal/protocol/basic/nginx/.keep
similarity index 100%
rename from internal/collector/basic/pop3/.keep
rename to internal/protocol/basic/nginx/.keep
diff --git a/internal/collector/basic/prometheus/.keep
b/internal/protocol/basic/ntp/.keep
similarity index 100%
rename from internal/collector/basic/prometheus/.keep
rename to internal/protocol/basic/ntp/.keep
diff --git a/internal/collector/basic/push/.keep
b/internal/protocol/basic/plc/.keep
similarity index 100%
rename from internal/collector/basic/push/.keep
rename to internal/protocol/basic/plc/.keep
diff --git a/internal/collector/basic/redfish/.keep
b/internal/protocol/basic/pop3/.keep
similarity index 100%
rename from internal/collector/basic/redfish/.keep
rename to internal/protocol/basic/pop3/.keep
diff --git a/internal/collector/basic/s7/.keep
b/internal/protocol/basic/prometheus/.keep
similarity index 100%
rename from internal/collector/basic/s7/.keep
rename to internal/protocol/basic/prometheus/.keep
diff --git a/internal/collector/basic/script/.keep
b/internal/protocol/basic/push/.keep
similarity index 100%
rename from internal/collector/basic/script/.keep
rename to internal/protocol/basic/push/.keep
diff --git a/internal/collector/basic/sd/.keep
b/internal/protocol/basic/redfish/.keep
similarity index 100%
rename from internal/collector/basic/sd/.keep
rename to internal/protocol/basic/redfish/.keep
diff --git a/internal/collector/basic/redis/redis_collector.go
b/internal/protocol/basic/redis/redis_collector.go
similarity index 93%
rename from internal/collector/basic/redis/redis_collector.go
rename to internal/protocol/basic/redis/redis_collector.go
index 9b23741..8b592f4 100644
--- a/internal/collector/basic/redis/redis_collector.go
+++ b/internal/protocol/basic/redis/redis_collector.go
@@ -30,10 +30,10 @@ import (
"github.com/redis/go-redis/v9"
"golang.org/x/crypto/ssh"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+ protocol2
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
sshhelper
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
@@ -143,7 +143,7 @@ func (rc *RedisCollector) Collect(metrics
*jobtypes.Metrics) *jobtypes.CollectRe
return response
}
-func (rc *RedisCollector) collectSingle(ctx context.Context, metrics
*jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context,
string, string) (net.Conn, error), timeout time.Duration, response
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+func (rc *RedisCollector) collectSingle(ctx context.Context, metrics
*jobtypes.Metrics, config *protocol2.RedisProtocol, dialer
func(context.Context, string, string) (net.Conn, error), timeout time.Duration,
response *jobtypes.CollectRepMetricsData, startTime time.Time) {
opts := &redis.Options{
Addr: fmt.Sprintf("%s:%s", config.Host, config.Port),
Username: config.Username,
@@ -174,7 +174,7 @@ func (rc *RedisCollector) collectSingle(ctx
context.Context, metrics *jobtypes.M
rc.addValueRow(response, metrics.AliasFields, parseMap)
}
-func (rc *RedisCollector) collectCluster(ctx context.Context, metrics
*jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context,
string, string) (net.Conn, error), timeout time.Duration, response
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+func (rc *RedisCollector) collectCluster(ctx context.Context, metrics
*jobtypes.Metrics, config *protocol2.RedisProtocol, dialer
func(context.Context, string, string) (net.Conn, error), timeout time.Duration,
response *jobtypes.CollectRepMetricsData, startTime time.Time) {
opts := &redis.ClusterOptions{
Addrs: []string{fmt.Sprintf("%s:%s", config.Host,
config.Port)},
Username: config.Username,
@@ -301,13 +301,13 @@ func (rc *RedisCollector) parseInfo(info string)
map[string]string {
}
// createRedisDialer creates a dialer function that supports SSH tunneling
-func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol.SSHTunnel)
(func(context.Context, string, string) (net.Conn, error), error) {
+func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol2.SSHTunnel)
(func(context.Context, string, string) (net.Conn, error), error) {
if sshTunnel == nil || sshTunnel.Enable != "true" {
return nil, nil
}
// Create SSH config
- sshConfig := &protocol.SSHProtocol{
+ sshConfig := &protocol2.SSHProtocol{
Host: sshTunnel.Host,
Port: sshTunnel.Port,
Username: sshTunnel.Username,
diff --git a/internal/collector/basic/redis/redis_collector_test.go
b/internal/protocol/basic/redis/redis_collector_test.go
similarity index 97%
rename from internal/collector/basic/redis/redis_collector_test.go
rename to internal/protocol/basic/redis/redis_collector_test.go
index 56f7b2c..d611a32 100644
--- a/internal/collector/basic/redis/redis_collector_test.go
+++ b/internal/protocol/basic/redis/redis_collector_test.go
@@ -26,10 +26,10 @@ import (
"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/basic/smtp/.keep
b/internal/protocol/basic/s7/.keep
similarity index 100%
rename from internal/collector/basic/smtp/.keep
rename to internal/protocol/basic/s7/.keep
diff --git a/internal/collector/basic/snmp/.keep
b/internal/protocol/basic/script/.keep
similarity index 100%
rename from internal/collector/basic/snmp/.keep
rename to internal/protocol/basic/script/.keep
diff --git a/internal/collector/basic/telnet/.keep
b/internal/protocol/basic/sd/.keep
similarity index 100%
rename from internal/collector/basic/telnet/.keep
rename to internal/protocol/basic/sd/.keep
diff --git a/internal/collector/basic/udp/.keep
b/internal/protocol/basic/smtp/.keep
similarity index 100%
rename from internal/collector/basic/udp/.keep
rename to internal/protocol/basic/smtp/.keep
diff --git a/internal/collector/basic/websocket/.keep
b/internal/protocol/basic/snmp/.keep
similarity index 100%
rename from internal/collector/basic/websocket/.keep
rename to internal/protocol/basic/snmp/.keep
diff --git a/internal/collector/basic/ssh/ssh_collector.go
b/internal/protocol/basic/ssh/ssh_collector.go
similarity index 98%
rename from internal/collector/basic/ssh/ssh_collector.go
rename to internal/protocol/basic/ssh/ssh_collector.go
index a611ded..fcf2cd5 100644
--- a/internal/collector/basic/ssh/ssh_collector.go
+++ b/internal/protocol/basic/ssh/ssh_collector.go
@@ -26,10 +26,10 @@ import (
"strings"
"time"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
sshhelper
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
diff --git a/internal/collector/common/cache/.keep
b/internal/protocol/basic/telnet/.keep
similarity index 100%
rename from internal/collector/common/cache/.keep
rename to internal/protocol/basic/telnet/.keep
diff --git a/internal/collector/common/dispatcher/entrance/.keep
b/internal/protocol/basic/udp/.keep
similarity index 100%
rename from internal/collector/common/dispatcher/entrance/.keep
rename to internal/protocol/basic/udp/.keep
diff --git a/internal/collector/common/dispatcher/exporter/.keep
b/internal/protocol/basic/websocket/.keep
similarity index 100%
rename from internal/collector/common/dispatcher/exporter/.keep
rename to internal/protocol/basic/websocket/.keep
diff --git a/internal/collector/extension/kafka/.keep
b/internal/protocol/extension/kafka/.keep
similarity index 100%
rename from internal/collector/extension/kafka/.keep
rename to internal/protocol/extension/kafka/.keep
diff --git a/internal/collector/extension/milvus/milvus_collector.go
b/internal/protocol/extension/milvus/milvus_collector.go
similarity index 96%
rename from internal/collector/extension/milvus/milvus_collector.go
rename to internal/protocol/extension/milvus/milvus_collector.go
index 6f69317..8935ff1 100644
--- a/internal/collector/extension/milvus/milvus_collector.go
+++ b/internal/protocol/extension/milvus/milvus_collector.go
@@ -27,9 +27,9 @@ import (
"time"
"github.com/milvus-io/milvus-sdk-go/v2/client"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/extension/milvus/milvus_collector_test.go
b/internal/protocol/extension/milvus/milvus_collector_test.go
similarity index 51%
rename from internal/collector/extension/milvus/milvus_collector_test.go
rename to internal/protocol/extension/milvus/milvus_collector_test.go
index 137d4c3..7aa5433 100644
--- a/internal/collector/extension/milvus/milvus_collector_test.go
+++ b/internal/protocol/extension/milvus/milvus_collector_test.go
@@ -20,32 +20,32 @@
package milvus
import (
- "os"
- "testing"
+ "os"
+ "testing"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
- loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
- "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
func TestMilvusCollector_Collect(t *testing.T) {
- metrics := &job.Metrics{
- Name: "milvus_test",
- Milvus: &protocol.MilvusProtocol{
- Host: "localhost",
- Port: "19530",
- },
- AliasFields: []string{"version", "responseTime", "host", "port"},
- }
+ metrics := &job.Metrics{
+ Name: "milvus_test",
+ Milvus: &protocol.MilvusProtocol{
+ Host: "localhost",
+ Port: "19530",
+ },
+ AliasFields: []string{"version", "responseTime", "host",
"port"},
+ }
- l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
- collector := NewMilvusCollector(l)
- result := collector.Collect(metrics)
+ l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
+ collector := NewMilvusCollector(l)
+ result := collector.Collect(metrics)
- if result.Code != 0 {
- t.Logf("Collect failed: %s", result.Msg)
- } else {
- t.Logf("Collect success: %+v", result.Values)
- }
+ if result.Code != 0 {
+ t.Logf("Collect failed: %s", result.Msg)
+ } else {
+ t.Logf("Collect success: %+v", result.Values)
+ }
}
diff --git a/internal/collector/extension/mongodb/.keep
b/internal/protocol/extension/mongodb/.keep
similarity index 100%
rename from internal/collector/extension/mongodb/.keep
rename to internal/protocol/extension/mongodb/.keep
diff --git a/internal/collector/extension/nebulagraph/.keep
b/internal/protocol/extension/nebulagraph/.keep
similarity index 100%
rename from internal/collector/extension/nebulagraph/.keep
rename to internal/protocol/extension/nebulagraph/.keep
diff --git a/internal/collector/extension/rocketmq/.keep
b/internal/protocol/extension/rocketmq/.keep
similarity index 100%
rename from internal/collector/extension/rocketmq/.keep
rename to internal/protocol/extension/rocketmq/.keep
diff --git a/internal/collector/basic/standard/imports.go
b/internal/protocol/standard/imports.go
similarity index 67%
rename from internal/collector/basic/standard/imports.go
rename to internal/protocol/standard/imports.go
index 5eb3217..ae47d46 100644
--- a/internal/collector/basic/standard/imports.go
+++ b/internal/protocol/standard/imports.go
@@ -20,11 +20,11 @@
package standard
import (
- _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
- _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http"
- _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis"
- _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh"
+ _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/database"
+ _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/http"
+ _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/redis"
+ _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/ssh"
// extensions
- _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/extension/milvus"
+ _
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/extension/milvus"
)
diff --git a/internal/collector/common/server/server.go
b/internal/server/server.go
similarity index 88%
rename from internal/collector/common/server/server.go
rename to internal/server/server.go
index 0994f9e..f06a5fe 100644
--- a/internal/collector/common/server/server.go
+++ b/internal/server/server.go
@@ -22,8 +22,8 @@ package server
import (
"io"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
- logger2
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+ logger2
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/collector/common/server/server_test.go
b/internal/server/server_test.go
similarity index 100%
rename from internal/collector/common/server/server_test.go
rename to internal/server/server_test.go
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index 408a0b5..d969f48 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -25,8 +25,8 @@ import (
"time"
pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
new file mode 100644
index 0000000..807630a
--- /dev/null
+++ b/internal/transport/transport.go
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package transport
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+
+ pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+ config2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
+ clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+ configtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+ loggertypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+const (
+ // DefaultManagerAddr is the default manager server address (Java Netty
default port)
+ DefaultManagerAddr = "127.0.0.1:1158"
+ // DefaultProtocol is the default communication protocol for Java
compatibility
+ DefaultProtocol = "netty"
+ // DefaultMode is the default operation mode
+ DefaultMode = "public"
+ // DefaultIdentity is the default collector identity
+ DefaultIdentity = "collector-go"
+)
+
+type Runner struct {
+ Config *configtypes.CollectorConfig
+ client TransportClient
+ jobScheduler JobScheduler
+ clrserver.Server
+}
+
+func New(cfg *configtypes.CollectorConfig) *Runner {
+ return &Runner{
+ Config: cfg,
+ Server: clrserver.Server{
+ Logger: logger.Logger{}, // Will be initialized
properly in Start method
+ },
+ }
+}
+
+// SetJobScheduler sets the job scheduler for the transport runner
+func (r *Runner) SetJobScheduler(scheduler JobScheduler) {
+ r.jobScheduler = scheduler
+}
+
+// NewFromConfig creates a new transport runner from collector configuration
+func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
+ if cfg == nil {
+ return nil
+ }
+ return New(cfg)
+}
+
+// NewFromEnv creates a new transport runner from environment variables
+func NewFromEnv() *Runner {
+ envLoader := config2.NewEnvConfigLoader()
+ cfg := envLoader.LoadFromEnv()
+ return NewFromConfig(cfg)
+}
+
+// NewFromUnifiedConfig creates a new transport runner using unified
configuration loading
+// It loads from file first, then overrides with environment variables
+func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
+ unifiedLoader := config2.NewUnifiedConfigLoader(cfgPath)
+ cfg, err := unifiedLoader.Load()
+ if err != nil {
+ return nil, err
+ }
+ return NewFromConfig(cfg), nil
+}
+
+func (r *Runner) Start(ctx context.Context) error {
+ // 初始化 Logger 如果它还没有被设置
+ if r.Logger.IsZero() {
+ r.Logger = logger.DefaultLogger(os.Stdout,
loggertypes.LogLevelInfo)
+ }
+ r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner",
r.Info().Name)
+ r.Logger.Info("Starting transport client")
+
+ // 构建 server 地址
+ addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host,
r.Config.Collector.Manager.Port)
+ if addr == ":" {
+ // 如果配置为空,使用环境变量或默认值
+ if v := os.Getenv("MANAGER_ADDR"); v != "" {
+ addr = v
+ } else {
+ addr = DefaultManagerAddr
+ }
+ }
+
+ // 确定协议
+ protocol := r.Config.Collector.Manager.Protocol
+ if protocol == "" {
+ if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
+ protocol = v
+ } else {
+ protocol = DefaultProtocol
+ }
+ }
+
+ r.Logger.Info("Connecting to manager server", "addr", addr, "protocol",
protocol)
+
+ // 创建客户端
+ factory := &TransportClientFactory{}
+ client, err := factory.CreateClient(protocol, addr)
+ if err != nil {
+ r.Logger.Error(err, "Failed to create transport client")
+ return err
+ }
+
+ // Set the identity on the client if it supports it
+ identity := r.Config.Collector.Identity
+ if identity == "" {
+ identity = DefaultIdentity
+ }
+
+ if nettyClient, ok := client.(*NettyClient); ok {
+ nettyClient.SetIdentity(identity)
+ }
+
+ r.client = client
+
+ // 设置事件处理器
+ switch c := client.(type) {
+ case *GrpcClient:
+ c.SetEventHandler(func(event Event) {
+ switch event.Type {
+ case EventConnected:
+ r.Logger.Info("Connected to manager gRPC
server", "addr", event.Address)
+ go r.sendOnlineMessage()
+ case EventDisconnected:
+ r.Logger.Info("Disconnected from manager gRPC
server", "addr", event.Address)
+ case EventConnectFailed:
+ r.Logger.Error(event.Error, "Failed to connect
to manager gRPC server", "addr", event.Address)
+ }
+ })
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ RegisterDefaultProcessors(c, r.jobScheduler)
+ r.Logger.Info("Registered gRPC processors with job
scheduler")
+ } else {
+ RegisterDefaultProcessors(c, nil)
+ r.Logger.Info("Registered gRPC processors without job
scheduler")
+ }
+ case *NettyClient:
+ c.SetEventHandler(func(event Event) {
+ switch event.Type {
+ case EventConnected:
+ r.Logger.Info("Connected to manager netty
server", "addr", event.Address)
+ go r.sendOnlineMessage()
+ case EventDisconnected:
+ r.Logger.Info("Disconnected from manager netty
server", "addr", event.Address)
+ case EventConnectFailed:
+ r.Logger.Error(event.Error, "Failed to connect
to manager netty server", "addr", event.Address)
+ }
+ })
+ // Register processors with job scheduler
+ if r.jobScheduler != nil {
+ RegisterDefaultNettyProcessors(c, r.jobScheduler)
+ r.Logger.Info("Registered netty processors with job
scheduler")
+ } else {
+ RegisterDefaultNettyProcessors(c, nil)
+ r.Logger.Info("Registered netty processors without job
scheduler")
+ }
+ }
+
+ if err := r.client.Start(); err != nil {
+ r.Logger.Error(err, "Failed to start transport client")
+ return err
+ }
+
+ // 创建新的context用于监控关闭信号
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ // 监听 ctx.Done 优雅关闭
+ go func() {
+ <-ctx.Done()
+ r.Logger.Info("Shutting down transport client...")
+ _ = r.client.Shutdown()
+ }()
+
+ // 阻塞直到 ctx.Done
+ <-ctx.Done()
+ return nil
+}
+
+func (r *Runner) sendOnlineMessage() {
+ if r.client != nil && r.client.IsStarted() {
+ // Use the configured identity
+ identity := r.Config.Collector.Identity
+ if identity == "" {
+ identity = DefaultIdentity
+ }
+
+ // Create CollectorInfo JSON structure as expected by Java
server
+ mode := r.Config.Collector.Mode
+ if mode == "" {
+ mode = DefaultMode // Default mode as in Java version
+ }
+
+ collectorInfo := map[string]interface{}{
+ "name": identity,
+ "ip": "", // Let server detect IP
+ "version": "1.0.0",
+ "mode": mode,
+ }
+
+ // Convert to JSON bytes
+ jsonData, err := json.Marshal(collectorInfo)
+ if err != nil {
+ r.Logger.Error(err, "Failed to marshal collector info
to JSON")
+ return
+ }
+
+ onlineMsg := &pb.Message{
+ Type: pb.MessageType_GO_ONLINE,
+ Direction: pb.Direction_REQUEST,
+ Identity: identity,
+ Msg: jsonData,
+ }
+
+ r.Logger.Info("Sending online message", "identity", identity,
"type", onlineMsg.Type)
+
+ if err := r.client.SendMsg(onlineMsg); err != nil {
+ r.Logger.Error(err, "Failed to send online message",
"identity", identity)
+ } else {
+ r.Logger.Info("Online message sent successfully",
"identity", identity)
+ }
+ }
+}
+
+func (r *Runner) Info() collector.Info {
+ return collector.Info{
+ Name: "transport",
+ }
+}
+
+func (r *Runner) Close() error {
+ r.Logger.Info("transport close...")
+ if r.client != nil {
+ _ = r.client.Shutdown()
+ }
+ return nil
+}
+
+// GetClient returns the transport client (for testing and advanced usage)
+func (r *Runner) GetClient() TransportClient {
+ return r.client
+}
+
+// IsConnected returns whether the client is connected and started
+func (r *Runner) IsConnected() bool {
+ return r.client != nil && r.client.IsStarted()
+}
diff --git a/internal/collector/common/types/collector/collector_types.go
b/internal/types/collector/collector_types.go
similarity index 100%
rename from internal/collector/common/types/collector/collector_types.go
rename to internal/types/collector/collector_types.go
diff --git a/internal/collector/common/types/config/config_types.go
b/internal/types/config/config_types.go
similarity index 100%
rename from internal/collector/common/types/config/config_types.go
rename to internal/types/config/config_types.go
diff --git a/internal/collector/common/types/err/error_types.go
b/internal/types/err/error_types.go
similarity index 100%
rename from internal/collector/common/types/err/error_types.go
rename to internal/types/err/error_types.go
diff --git a/internal/collector/common/types/job/job_types.go
b/internal/types/job/job_types.go
similarity index 100%
rename from internal/collector/common/types/job/job_types.go
rename to internal/types/job/job_types.go
diff --git a/internal/collector/common/types/job/metrics_types.go
b/internal/types/job/metrics_types.go
similarity index 88%
rename from internal/collector/common/types/job/metrics_types.go
rename to internal/types/job/metrics_types.go
index ea591f3..207bf07 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/types/job/metrics_types.go
@@ -21,7 +21,7 @@ import (
"fmt"
"time"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
+ protocol2
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
)
// Metrics represents a metric configuration
@@ -46,14 +46,14 @@ type Metrics struct {
HasSubTask bool `json:"hasSubTask"`
// Protocol specific fields
- HTTP *protocol.HTTPProtocol `json:"http,omitempty"`
- SSH *protocol.SSHProtocol `json:"ssh,omitempty"`
- JDBC *protocol.JDBCProtocol `json:"jdbc,omitempty"`
- SNMP *protocol.SNMPProtocol `json:"snmp,omitempty"`
- JMX *protocol.JMXProtocol `json:"jmx,omitempty"`
- Redis *protocol.RedisProtocol `json:"redis,omitempty"`
- MongoDB *protocol.MongoDBProtocol `json:"mongodb,omitempty"`
- Milvus *protocol.MilvusProtocol `json:"milvus,omitempty"`
+ HTTP *protocol2.HTTPProtocol `json:"http,omitempty"`
+ SSH *protocol2.SSHProtocol `json:"ssh,omitempty"`
+ JDBC *protocol2.JDBCProtocol `json:"jdbc,omitempty"`
+ SNMP *protocol2.SNMPProtocol `json:"snmp,omitempty"`
+ JMX *protocol2.JMXProtocol `json:"jmx,omitempty"`
+ Redis *protocol2.RedisProtocol `json:"redis,omitempty"`
+ MongoDB *protocol2.MongoDBProtocol `json:"mongodb,omitempty"`
+ Milvus *protocol2.MilvusProtocol `json:"milvus,omitempty"`
}
// Field represents a metric field
@@ -218,18 +218,18 @@ type SSHProtocol struct {
// JDBCProtocol represents JDBC protocol configuration
type JDBCProtocol struct {
- Host string `json:"host"`
- Port string `json:"port"`
- Platform string `json:"platform"`
- Username string `json:"username"`
- Password string `json:"password"`
- Database string `json:"database"`
- Timeout string `json:"timeout"`
- QueryType string `json:"queryType"`
- SQL string `json:"sql"`
- URL string `json:"url"`
- ReuseConnection string `json:"reuseConnection"`
- SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"`
+ Host string `json:"host"`
+ Port string `json:"port"`
+ Platform string `json:"platform"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ Database string `json:"database"`
+ Timeout string `json:"timeout"`
+ QueryType string `json:"queryType"`
+ SQL string `json:"sql"`
+ URL string `json:"url"`
+ ReuseConnection string `json:"reuseConnection"`
+ SSHTunnel *protocol2.SSHTunnel `json:"sshTunnel,omitempty"`
}
// SNMPProtocol represents SNMP protocol configuration
@@ -262,13 +262,13 @@ type JMXProtocol struct {
// RedisProtocol represents Redis protocol configuration
type RedisProtocol struct {
- Host string `json:"host"`
- Port string `json:"port"`
- Username string `json:"username"`
- Password string `json:"password"`
- Pattern string `json:"pattern"`
- Timeout string `json:"timeout"`
- SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"`
+ Host string `json:"host"`
+ Port string `json:"port"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ Pattern string `json:"pattern"`
+ Timeout string `json:"timeout"`
+ SSHTunnel *protocol2.SSHTunnel `json:"sshTunnel,omitempty"`
}
// MongoDBProtocol represents MongoDB protocol configuration
diff --git
a/internal/collector/common/types/job/protocol/common_request_protocol.go
b/internal/types/job/protocol/common_request_protocol.go
similarity index 100%
rename from
internal/collector/common/types/job/protocol/common_request_protocol.go
rename to internal/types/job/protocol/common_request_protocol.go
diff --git a/internal/collector/common/types/job/protocol/consul_sd_protocol.go
b/internal/types/job/protocol/consul_sd_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/consul_sd_protocol.go
rename to internal/types/job/protocol/consul_sd_protocol.go
diff --git a/internal/collector/common/types/job/protocol/http_protocol.go
b/internal/types/job/protocol/http_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/http_protocol.go
rename to internal/types/job/protocol/http_protocol.go
diff --git a/internal/collector/common/types/job/protocol/jdbc_protocol.go
b/internal/types/job/protocol/jdbc_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/jdbc_protocol.go
rename to internal/types/job/protocol/jdbc_protocol.go
diff --git a/internal/collector/common/types/job/protocol/jmx_protocol.go
b/internal/types/job/protocol/jmx_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/jmx_protocol.go
rename to internal/types/job/protocol/jmx_protocol.go
diff --git a/internal/collector/common/types/job/protocol/milvus_protocol.go
b/internal/types/job/protocol/milvus_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/milvus_protocol.go
rename to internal/types/job/protocol/milvus_protocol.go
diff --git a/internal/collector/common/types/job/protocol/mongodb_protocol.go
b/internal/types/job/protocol/mongodb_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/mongodb_protocol.go
rename to internal/types/job/protocol/mongodb_protocol.go
diff --git a/internal/collector/common/types/job/protocol/redis_protocol.go
b/internal/types/job/protocol/redis_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/redis_protocol.go
rename to internal/types/job/protocol/redis_protocol.go
diff --git a/internal/collector/common/types/job/protocol/snmp_protocol.go
b/internal/types/job/protocol/snmp_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/snmp_protocol.go
rename to internal/types/job/protocol/snmp_protocol.go
diff --git a/internal/collector/common/types/job/protocol/ssh_protocol.go
b/internal/types/job/protocol/ssh_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/ssh_protocol.go
rename to internal/types/job/protocol/ssh_protocol.go
diff --git a/internal/collector/common/types/job/protocol/ssh_tunnel.go
b/internal/types/job/protocol/ssh_tunnel.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/ssh_tunnel.go
rename to internal/types/job/protocol/ssh_tunnel.go
diff --git
a/internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go
b/internal/types/job/protocol/zookeeper_sd_protocol.go
similarity index 100%
rename from
internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go
rename to internal/types/job/protocol/zookeeper_sd_protocol.go
diff --git a/internal/collector/common/types/job/timeout_types.go
b/internal/types/job/timeout_types.go
similarity index 100%
rename from internal/collector/common/types/job/timeout_types.go
rename to internal/types/job/timeout_types.go
diff --git a/internal/collector/common/types/logger/logging_types.go
b/internal/types/logger/logging_types.go
similarity index 100%
rename from internal/collector/common/types/logger/logging_types.go
rename to internal/types/logger/logging_types.go
diff --git a/internal/util/arrow/arrow_serializer.go
b/internal/util/arrow/arrow_serializer.go
index 1915b43..0b78f5a 100644
--- a/internal/util/arrow/arrow_serializer.go
+++ b/internal/util/arrow/arrow_serializer.go
@@ -24,7 +24,7 @@ import (
"encoding/json"
"fmt"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+ jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/util/crypto/aes_util.go b/internal/util/crypto/aes_util.go
index 678760f..1567c56 100644
--- a/internal/util/crypto/aes_util.go
+++ b/internal/util/crypto/aes_util.go
@@ -26,7 +26,7 @@ import (
"sync"
"unicode"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/util/logger/logger.go b/internal/util/logger/logger.go
index 67d4652..cc657eb 100644
--- a/internal/util/logger/logger.go
+++ b/internal/util/logger/logger.go
@@ -18,65 +18,64 @@
package logger
import (
- "io"
- "os"
-
- "github.com/go-logr/logr"
- "github.com/go-logr/zapr"
- "go.uber.org/zap"
- "go.uber.org/zap/zapcore"
-
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "io"
+ "os"
+
+ "github.com/go-logr/logr"
+ "github.com/go-logr/zapr"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
)
type Logger struct {
- logr.Logger
- out io.Writer
- logging *logger.HertzBeatLogging
- sugaredLogger *zap.SugaredLogger
+ logr.Logger
+ out io.Writer
+ logging *logger.HertzBeatLogging
+ sugaredLogger *zap.SugaredLogger
}
func NewLogger(w io.Writer, logging *logger.HertzBeatLogging) Logger {
- logger := initZapLogger(w, logging,
logging.Level[logger.LogComponentHertzbeatDefault])
+ logger := initZapLogger(w, logging,
logging.Level[logger.LogComponentHertzbeatDefault])
- return Logger{
- Logger: zapr.NewLogger(logger),
- out: w,
- logging: logging,
- sugaredLogger: logger.Sugar(),
- }
+ return Logger{
+ Logger: zapr.NewLogger(logger),
+ out: w,
+ logging: logging,
+ sugaredLogger: logger.Sugar(),
+ }
}
func FileLogger(file, name string, level logger.LogLevel) Logger {
- writer, err := os.OpenFile(file, os.O_WRONLY, 0o666)
- if err != nil {
- panic(err)
- }
+ writer, err := os.OpenFile(file, os.O_WRONLY, 0o666)
+ if err != nil {
+ panic(err)
+ }
- logging := logger.DefaultHertzbeatLogging()
- logger := initZapLogger(writer, logging, level)
+ logging := logger.DefaultHertzbeatLogging()
+ logger := initZapLogger(writer, logging, level)
- return Logger{
- Logger: zapr.NewLogger(logger).WithName(name),
- logging: logging,
- out: writer,
- sugaredLogger: logger.Sugar(),
- }
+ return Logger{
+ Logger: zapr.NewLogger(logger).WithName(name),
+ logging: logging,
+ out: writer,
+ sugaredLogger: logger.Sugar(),
+ }
}
func DefaultLogger(out io.Writer, level logger.LogLevel) Logger {
- logging := logger.DefaultHertzbeatLogging()
- logger := initZapLogger(out, logging, level)
+ logging := logger.DefaultHertzbeatLogging()
+ logger := initZapLogger(out, logging, level)
- return Logger{
- Logger: zapr.NewLogger(logger),
- out: out,
- logging: logging,
- sugaredLogger: logger.Sugar(),
- }
+ return Logger{
+ Logger: zapr.NewLogger(logger),
+ out: out,
+ logging: logging,
+ sugaredLogger: logger.Sugar(),
+ }
}
// WithName returns a new Logger instance with the specified name element added
@@ -86,23 +85,23 @@ func DefaultLogger(out io.Writer, level logger.LogLevel)
Logger {
// more information).
func (l Logger) WithName(name string) Logger {
- logLevel := l.logging.Level[logger.HertzbeatLogComponent(name)]
- logger := initZapLogger(l.out, l.logging, logLevel)
+ logLevel := l.logging.Level[logger.HertzbeatLogComponent(name)]
+ logger := initZapLogger(l.out, l.logging, logLevel)
- return Logger{
- Logger: zapr.NewLogger(logger).WithName(name),
- logging: l.logging,
- out: l.out,
- sugaredLogger: logger.Sugar().Named(name),
- }
+ return Logger{
+ Logger: zapr.NewLogger(logger).WithName(name),
+ logging: l.logging,
+ out: l.out,
+ sugaredLogger: logger.Sugar().Named(name),
+ }
}
// WithValues returns a new Logger instance with additional key/value pairs.
// See Info for documentation on how key/value pairs work.
func (l Logger) WithValues(keysAndValues ...interface{}) Logger {
- l.Logger = l.Logger.WithValues(keysAndValues...)
- return l
+ l.Logger = l.Logger.WithValues(keysAndValues...)
+ return l
}
// A Sugar wraps the base Logger functionality in a slower, but less
@@ -125,13 +124,13 @@ func (l Logger) WithValues(keysAndValues ...interface{})
Logger {
// Infoln(...any) Println-style logger
func (l Logger) Sugar() *zap.SugaredLogger {
- return l.sugaredLogger
+ return l.sugaredLogger
}
func initZapLogger(w io.Writer, logging *logger.HertzBeatLogging, level
logger.LogLevel) *zap.Logger {
- parseLevel, _ :=
zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level)))
- core :=
zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel))
+ parseLevel, _ :=
zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level)))
+ core :=
zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel))
- return zap.New(core, zap.AddCaller())
+ return zap.New(core, zap.AddCaller())
}
diff --git a/internal/util/logger/logger_test.go
b/internal/util/logger/logger_test.go
index c3dd2cf..b3c7844 100644
--- a/internal/util/logger/logger_test.go
+++ b/internal/util/logger/logger_test.go
@@ -26,7 +26,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
)
func TestZapLogLevel(t *testing.T) {
diff --git a/internal/util/param/param_replacer.go
b/internal/util/param/param_replacer.go
index 984e8cd..4ee423f 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -26,9 +26,9 @@ import (
"strconv"
"strings"
- jobtypes
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+ "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+ protocol2
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
@@ -44,7 +44,7 @@ func NewReplacer() *Replacer {
// PreprocessJobPasswords decrypts passwords in job configmap once during job
creation
// This permanently replaces encrypted passwords with decrypted ones in the
job configmap
-func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error {
+func (r *Replacer) PreprocessJobPasswords(job *job.Job) error {
if job == nil || job.Configmap == nil {
return nil
}
@@ -71,7 +71,7 @@ func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job)
error {
}
// ReplaceJobParams replaces all parameter placeholders in job configuration
-func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) {
+func (r *Replacer) ReplaceJobParams(job *job.Job) (*job.Job, error) {
if job == nil {
return nil, fmt.Errorf("job is nil")
}
@@ -97,7 +97,7 @@ func (r *Replacer) ReplaceJobParams(job *jobtypes.Job)
(*jobtypes.Job, error) {
// createParamMapSimple creates a parameter map from configmap entries
// Assumes passwords have already been decrypted by PreprocessJobPasswords
-func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap)
map[string]string {
+func (r *Replacer) createParamMapSimple(configmap []job.Configmap)
map[string]string {
paramMap := make(map[string]string)
for _, config := range configmap {
@@ -129,7 +129,7 @@ func (r *Replacer) createParamMapSimple(configmap
[]jobtypes.Configmap) map[stri
// ReplaceMetricsParams replaces parameters in metrics configuration
// Exported method to be called by MetricsCollector
-func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap
map[string]string) error {
+func (r *Replacer) ReplaceMetricsParams(metrics *job.Metrics, paramMap
map[string]string) error {
// 1. JDBC
if metrics.JDBC != nil {
r.replaceJDBCParams(metrics.JDBC, paramMap)
@@ -159,7 +159,7 @@ func (r *Replacer) ReplaceMetricsParams(metrics
*jobtypes.Metrics, paramMap map[
}
// replaceHTTPParams specific replacement logic for HTTPProtocol struct
-func (r *Replacer) replaceHTTPParams(http *protocol.HTTPProtocol, paramMap
map[string]string) {
+func (r *Replacer) replaceHTTPParams(http *protocol2.HTTPProtocol, paramMap
map[string]string) {
http.URL = r.replaceParamPlaceholders(http.URL, paramMap)
http.Method = r.replaceParamPlaceholders(http.Method, paramMap)
http.Body = r.replaceParamPlaceholders(http.Body, paramMap)
@@ -207,7 +207,7 @@ func (r *Replacer) replaceHTTPParams(http
*protocol.HTTPProtocol, paramMap map[s
}
// replaceRedisParams specific replacement logic for RedisProtocol struct
-func (r *Replacer) replaceRedisParams(redis *protocol.RedisProtocol, paramMap
map[string]string) {
+func (r *Replacer) replaceRedisParams(redis *protocol2.RedisProtocol, paramMap
map[string]string) {
redis.Host = r.replaceParamPlaceholders(redis.Host, paramMap)
redis.Port = r.replaceParamPlaceholders(redis.Port, paramMap)
redis.Username = r.replaceParamPlaceholders(redis.Username, paramMap)
@@ -223,7 +223,7 @@ func (r *Replacer) replaceRedisParams(redis
*protocol.RedisProtocol, paramMap ma
}
// replaceSSHParams specific replacement logic for SSHProtocol struct
-func (r *Replacer) replaceSSHParams(ssh *protocol.SSHProtocol, paramMap
map[string]string) {
+func (r *Replacer) replaceSSHParams(ssh *protocol2.SSHProtocol, paramMap
map[string]string) {
ssh.Host = r.replaceParamPlaceholders(ssh.Host, paramMap)
ssh.Port = r.replaceParamPlaceholders(ssh.Port, paramMap)
ssh.Username = r.replaceParamPlaceholders(ssh.Username, paramMap)
@@ -244,7 +244,7 @@ func (r *Replacer) replaceSSHParams(ssh
*protocol.SSHProtocol, paramMap map[stri
}
// replaceJDBCParams specific replacement logic for JDBCProtocol struct
-func (r *Replacer) replaceJDBCParams(jdbc *protocol.JDBCProtocol, paramMap
map[string]string) {
+func (r *Replacer) replaceJDBCParams(jdbc *protocol2.JDBCProtocol, paramMap
map[string]string) {
jdbc.Host = r.replaceParamPlaceholders(jdbc.Host, paramMap)
jdbc.Port = r.replaceParamPlaceholders(jdbc.Port, paramMap)
jdbc.Platform = r.replaceParamPlaceholders(jdbc.Platform, paramMap)
@@ -265,7 +265,7 @@ func (r *Replacer) replaceJDBCParams(jdbc
*protocol.JDBCProtocol, paramMap map[s
}
// replaceBasicMetricsParams replaces parameters in basic metrics fields
-func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics,
paramMap map[string]string) error {
+func (r *Replacer) replaceBasicMetricsParams(metrics *job.Metrics, paramMap
map[string]string) error {
metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap)
metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap)
metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap)
@@ -321,14 +321,14 @@ func (r *Replacer)
ExtractProtocolConfig(protocolInterface interface{}, targetSt
}
// ExtractJDBCConfig extracts and processes JDBC configuration
-func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{})
(*protocol.JDBCProtocol, error) {
+func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{})
(*protocol2.JDBCProtocol, error) {
if jdbcInterface == nil {
return nil, nil
}
- if jdbcConfig, ok := jdbcInterface.(*protocol.JDBCProtocol); ok {
+ if jdbcConfig, ok := jdbcInterface.(*protocol2.JDBCProtocol); ok {
return jdbcConfig, nil
}
- var jdbcConfig protocol.JDBCProtocol
+ var jdbcConfig protocol2.JDBCProtocol
if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err !=
nil {
return nil, fmt.Errorf("failed to extract JDBC config: %w", err)
}
@@ -336,14 +336,14 @@ func (r *Replacer) ExtractJDBCConfig(jdbcInterface
interface{}) (*protocol.JDBCP
}
// ExtractHTTPConfig extracts and processes HTTP configuration
-func (r *Replacer) ExtractHTTPConfig(httpInterface interface{})
(*protocol.HTTPProtocol, error) {
+func (r *Replacer) ExtractHTTPConfig(httpInterface interface{})
(*protocol2.HTTPProtocol, error) {
if httpInterface == nil {
return nil, nil
}
- if httpConfig, ok := httpInterface.(*protocol.HTTPProtocol); ok {
+ if httpConfig, ok := httpInterface.(*protocol2.HTTPProtocol); ok {
return httpConfig, nil
}
- var httpConfig protocol.HTTPProtocol
+ var httpConfig protocol2.HTTPProtocol
if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err !=
nil {
return nil, fmt.Errorf("failed to extract HTTP config: %w", err)
}
@@ -351,14 +351,14 @@ func (r *Replacer) ExtractHTTPConfig(httpInterface
interface{}) (*protocol.HTTPP
}
// ExtractSSHConfig extracts and processes SSH configuration
-func (r *Replacer) ExtractSSHConfig(sshInterface interface{})
(*protocol.SSHProtocol, error) {
+func (r *Replacer) ExtractSSHConfig(sshInterface interface{})
(*protocol2.SSHProtocol, error) {
if sshInterface == nil {
return nil, nil
}
- if sshConfig, ok := sshInterface.(*protocol.SSHProtocol); ok {
+ if sshConfig, ok := sshInterface.(*protocol2.SSHProtocol); ok {
return sshConfig, nil
}
- var sshConfig protocol.SSHProtocol
+ var sshConfig protocol2.SSHProtocol
if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil
{
return nil, fmt.Errorf("failed to extract SSH config: %w", err)
}
diff --git a/internal/util/ssh/ssh_helper.go b/internal/util/ssh/ssh_helper.go
index 77b7d21..03bc7ce 100644
--- a/internal/util/ssh/ssh_helper.go
+++ b/internal/util/ssh/ssh_helper.go
@@ -28,8 +28,8 @@ import (
"strings"
"golang.org/x/crypto/ssh"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
diff --git a/internal/util/ssh/ssh_helper_test.go
b/internal/util/ssh/ssh_helper_test.go
index 4cbc6fd..9bda9d8 100644
--- a/internal/util/ssh/ssh_helper_test.go
+++ b/internal/util/ssh/ssh_helper_test.go
@@ -31,9 +31,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"
+
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+ loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
-
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
- loggertype
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]