This is an automated email from the ASF dual-hosted git repository. shown pushed a commit to branch 1212-yuluo/refactor in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git
commit 67fb14657927d7a1cb0c9766556b65b5f84b9df9 Author: yuluo-yx <[email protected]> AuthorDate: Fri Dec 12 23:01:01 2025 +0800 refactor: adjust layout Signed-off-by: yuluo-yx <[email protected]> --- cmd/main.go | 16 +- internal/banner/embed.go | 4 +- internal/cmd/server.go | 16 +- internal/collector/basic/http/http_collector.go | 596 --------------------- internal/collector/common/transport/transport.go | 277 ---------- internal/{collector => }/config/config.go | 6 +- internal/{collector => }/config/config_factory.go | 4 +- internal/{collector => }/config/config_test.go | 0 internal/{collector => }/config/env_config.go | 4 +- internal/{collector => }/config/unified_config.go | 4 +- internal/{collector/basic/dns => job/cache}/.keep | 0 .../collect/dispatch/metrics_collector.go | 6 +- .../collect/dispatch/metrics_collector_test.go | 4 +- .../common => job}/collect/lazy_message_router.go | 2 +- .../collect/metrics/hertzbeat_metrics_collector.go | 0 .../common => job}/collect/result_handler.go | 6 +- .../collect/strategy/strategy_factory.go | 2 +- .../common => job}/dispatcher/common_dispatcher.go | 4 +- .../basic/ftp => job/dispatcher/entrance}/.keep | 0 .../basic/icmp => job/dispatcher/exporter}/.keep | 0 .../dispatcher/hashed_wheel_timer.go | 2 +- .../common => job}/dispatcher/time_wheel.go | 22 +- .../common => job}/dispatcher/wheel_timer_task.go | 14 +- .../common => job}/router/message_router.go | 18 +- .../common/job => job/server}/job_server.go | 16 +- .../collector_types.go => metrics/metrics.go} | 6 +- .../basic/database/jdbc_collector.go | 14 +- .../basic/database/jdbc_collector_test.go | 0 .../basic/imap => protocol/basic/dns}/.keep | 0 .../basic/ipmi2 => protocol/basic/ftp}/.keep | 0 internal/protocol/basic/http/http_collector.go | 596 +++++++++++++++++++++ .../basic/memcached => protocol/basic/icmp}/.keep | 0 .../basic/modbus => protocol/basic/imap}/.keep | 0 .../basic/mqtt => protocol/basic/ipmi2}/.keep | 0 .../basic/nginx => protocol/basic/memcached}/.keep | 0 .../basic/ntp => protocol/basic/modbus}/.keep | 0 .../basic/plc => protocol/basic/mqtt}/.keep | 0 .../basic/pop3 => protocol/basic/nginx}/.keep | 0 .../basic/prometheus => protocol/basic/ntp}/.keep | 0 .../basic/push => protocol/basic/plc}/.keep | 0 .../basic/redfish => protocol/basic/pop3}/.keep | 0 .../basic/s7 => protocol/basic/prometheus}/.keep | 0 .../basic/script => protocol/basic/push}/.keep | 0 .../basic/sd => protocol/basic/redfish}/.keep | 0 .../basic/redis/redis_collector.go | 14 +- .../basic/redis/redis_collector_test.go | 6 +- .../basic/smtp => protocol/basic/s7}/.keep | 0 .../basic/snmp => protocol/basic/script}/.keep | 0 .../basic/telnet => protocol/basic/sd}/.keep | 0 .../basic/udp => protocol/basic/smtp}/.keep | 0 .../basic/websocket => protocol/basic/snmp}/.keep | 0 .../basic/ssh/ssh_collector.go | 6 +- .../common/cache => protocol/basic/telnet}/.keep | 0 .../entrance => protocol/basic/udp}/.keep | 0 .../exporter => protocol/basic/websocket}/.keep | 0 .../{collector => protocol}/extension/kafka/.keep | 0 .../extension/milvus/milvus_collector.go | 4 +- .../extension/milvus/milvus_collector_test.go | 44 +- .../extension/mongodb/.keep | 0 .../extension/nebulagraph/.keep | 0 .../extension/rocketmq/.keep | 0 .../basic => protocol}/standard/imports.go | 10 +- internal/{collector/common => }/server/server.go | 4 +- .../{collector/common => }/server/server_test.go | 0 internal/transport/processors.go | 4 +- internal/transport/transport.go | 276 ++++++++++ .../common => }/types/collector/collector_types.go | 0 .../common => }/types/config/config_types.go | 0 .../common => }/types/err/error_types.go | 0 .../{collector/common => }/types/job/job_types.go | 0 .../common => }/types/job/metrics_types.go | 56 +- .../types/job/protocol/common_request_protocol.go | 0 .../types/job/protocol/consul_sd_protocol.go | 0 .../types/job/protocol/http_protocol.go | 0 .../types/job/protocol/jdbc_protocol.go | 0 .../common => }/types/job/protocol/jmx_protocol.go | 0 .../types/job/protocol/milvus_protocol.go | 0 .../types/job/protocol/mongodb_protocol.go | 0 .../types/job/protocol/redis_protocol.go | 0 .../types/job/protocol/snmp_protocol.go | 0 .../common => }/types/job/protocol/ssh_protocol.go | 0 .../common => }/types/job/protocol/ssh_tunnel.go | 0 .../types/job/protocol/zookeeper_sd_protocol.go | 0 .../common => }/types/job/timeout_types.go | 0 .../common => }/types/logger/logging_types.go | 0 internal/util/arrow/arrow_serializer.go | 2 +- internal/util/crypto/aes_util.go | 2 +- internal/util/logger/logger.go | 107 ++-- internal/util/logger/logger_test.go | 2 +- internal/util/param/param_replacer.go | 42 +- internal/util/ssh/ssh_helper.go | 2 +- internal/util/ssh/ssh_helper_test.go | 4 +- 92 files changed, 1109 insertions(+), 1115 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 2b497b4..8acd589 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,17 +18,17 @@ package main import ( - "fmt" - "os" + "fmt" + "os" - "hertzbeat.apache.org/hertzbeat-collector-go/cmd/root" - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/standard" + "hertzbeat.apache.org/hertzbeat-collector-go/cmd/root" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/standard" ) func main() { - if err := root.GetRootCommand().Execute(); err != nil { - _, _ = fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } + if err := root.GetRootCommand().Execute(); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } } diff --git a/internal/banner/embed.go b/internal/banner/embed.go index e18bdd1..f3c3aa2 100644 --- a/internal/banner/embed.go +++ b/internal/banner/embed.go @@ -24,8 +24,8 @@ import ( "strconv" "text/template" - clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" - bannertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err" + clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" + bannertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err" ) const ( diff --git a/internal/cmd/server.go b/internal/cmd/server.go index 687f52a..5980263 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -28,16 +28,16 @@ import ( "syscall" "github.com/spf13/cobra" + cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/config" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect" + jobserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server" + clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" + transportserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" + collectortypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector" + configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + collectorerr "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err" bannerouter "hertzbeat.apache.org/hertzbeat-collector-go/internal/banner" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" - jobserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job" - clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" - transportserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport" - collectortypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" - configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - collectorerr "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err" - cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" ) var ( diff --git a/internal/collector/basic/http/http_collector.go b/internal/collector/basic/http/http_collector.go deleted file mode 100644 index 32632a5..0000000 --- a/internal/collector/basic/http/http_collector.go +++ /dev/null @@ -1,596 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package http - -import ( - "bytes" - "crypto/md5" - "crypto/rand" - "crypto/tls" - "encoding/hex" - "encoding/json" - "fmt" - "io" - "net/http" - "strconv" - "strings" - "time" - - "github.com/prometheus/common/expfmt" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" -) - -func init() { - strategy.RegisterFactory(ProtocolHTTP, func(logger logger.Logger) strategy.Collector { - return NewHTTPCollector(logger) - }) -} - -const ( - ProtocolHTTP = "http" - - // Parse Types - ParseTypeDefault = "default" - ParseTypeJsonPath = "jsonPath" - ParseTypePrometheus = "prometheus" - ParseTypeWebsite = "website" - ParseTypeHeader = "header" - - // Auth Types - AuthTypeBasic = "Basic Auth" - AuthTypeBearer = "Bearer Token" - AuthTypeDigest = "Digest Auth" -) - -type HTTPCollector struct { - logger logger.Logger -} - -func NewHTTPCollector(log logger.Logger) *HTTPCollector { - return &HTTPCollector{ - logger: log.WithName("http-collector"), - } -} - -func (hc *HTTPCollector) Protocol() string { - return ProtocolHTTP -} - -func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error { - if metrics == nil || metrics.HTTP == nil { - return fmt.Errorf("http configuration is missing") - } - return nil -} - -func (hc *HTTPCollector) Collect(metrics *job.Metrics) *job.CollectRepMetricsData { - start := time.Now() - - // MetricsCollector has already performed parameter replacement on metrics.HTTP - httpConfig := metrics.HTTP - - // 1. Prepare URL - targetURL := httpConfig.URL - - // Parse SSL bool string - isSSL := false - if httpConfig.SSL != "" { - if val, err := strconv.ParseBool(httpConfig.SSL); err == nil { - isSSL = val - } - } - - if targetURL == "" { - schema := "http" - if isSSL { - schema = "https" - } - // Use metrics.ConfigMap values if URL is empty - // Note: metrics.ConfigMap is already populated with job.Configmap values - host := metrics.ConfigMap["host"] - port := metrics.ConfigMap["port"] - if host != "" && port != "" { - targetURL = fmt.Sprintf("%s://%s:%s", schema, host, port) - } - } - - if targetURL != "" && !strings.HasPrefix(targetURL, "http") { - schema := "http" - if isSSL { - schema = "https" - } - targetURL = fmt.Sprintf("%s://%s", schema, targetURL) - } - - if targetURL == "" { - return hc.createFailResponse(metrics, constants.CollectFail, "target URL is empty") - } - - // 2. Create Request - req, err := hc.createRequest(httpConfig, targetURL) - if err != nil { - return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to create request: %v", err)) - } - - // 3. Create Client - timeoutStr := metrics.Timeout - if httpConfig.Timeout != "" { - timeoutStr = httpConfig.Timeout - } - - timeout := hc.getTimeout(timeoutStr) - client := &http.Client{ - Timeout: timeout, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, - } - - // 4. Execute Request (with Digest retry logic) - resp, err := client.Do(req) - - // Handle Digest Auth Challenge (401) - if err == nil && resp.StatusCode == http.StatusUnauthorized && - httpConfig.Authorization != nil && - strings.EqualFold(httpConfig.Authorization.Type, AuthTypeDigest) { - - // Close the first response body - io.Copy(io.Discard, resp.Body) - resp.Body.Close() - - authHeader := resp.Header.Get("WWW-Authenticate") - if authHeader != "" { - // Create new request with Authorization header - digestReq, digestErr := hc.handleDigestAuth(req, authHeader, httpConfig.Authorization) - if digestErr == nil { - // Retry request - resp, err = client.Do(digestReq) - } else { - hc.logger.Error(digestErr, "failed to handle digest auth") - } - } - } - - if err != nil { - return hc.createFailResponse(metrics, constants.CollectUnReachable, fmt.Sprintf("request failed: %v", err)) - } - defer resp.Body.Close() - - responseTime := time.Since(start).Milliseconds() - - // 5. Parse Response - parseType := httpConfig.ParseType - if parseType == "" { - parseType = ParseTypeDefault - } - - var responseData *job.CollectRepMetricsData - - // Read body - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to read body: %v", err)) - } - - switch parseType { - case ParseTypePrometheus: - responseData, err = hc.parsePrometheus(bodyBytes, metrics) - case ParseTypeWebsite: - responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode, responseTime, metrics, httpConfig) - case ParseTypeHeader: - responseData, err = hc.parseHeader(resp.Header, metrics) - case ParseTypeJsonPath, ParseTypeDefault: - responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) - default: - responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) - } - - if err != nil { - hc.logger.Error(err, "parse response failed", "type", parseType) - return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("parse error: %v", err)) - } - - hc.fillCommonFields(responseData, metrics.AliasFields, responseTime, resp.StatusCode) - - return responseData -} - -// handleDigestAuth generates a new request with the Digest Authorization header -func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, authHeader string, authConfig *protocol.Authorization) (*http.Request, error) { - params := hc.parseAuthHeader(authHeader) - realm := params["realm"] - nonce := params["nonce"] - qop := params["qop"] - opaque := params["opaque"] - algorithm := params["algorithm"] - if algorithm == "" { - algorithm = "MD5" - } - - if realm == "" || nonce == "" { - return nil, fmt.Errorf("missing realm or nonce in digest header") - } - - username := authConfig.DigestAuthUsername - password := authConfig.DigestAuthPassword - ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password)) - - method := originalReq.Method - uri := originalReq.URL.RequestURI() - ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri)) - - nc := "00000001" - cnonce := hc.generateCnonce() - - var responseStr string - if qop == "" { - responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce, ha2)) - } else if strings.Contains(qop, "auth") { - responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc, cnonce, "auth", ha2)) - } else { - return nil, fmt.Errorf("unsupported qop: %s", qop) - } - - authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s", algorithm="%s"`, - username, realm, nonce, uri, responseStr, algorithm) - - if opaque != "" { - authVal += fmt.Sprintf(`, opaque="%s"`, opaque) - } - if qop != "" { - authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc, cnonce) - } - - var newReq *http.Request - if originalReq.GetBody != nil { - body, _ := originalReq.GetBody() - newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), body) - } else { - newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), nil) - } - - for k, v := range originalReq.Header { - newReq.Header[k] = v - } - - newReq.Header.Set("Authorization", authVal) - return newReq, nil -} - -func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string { - header = strings.TrimPrefix(header, "Digest ") - result := make(map[string]string) - - pairs := strings.Split(header, ",") - for _, pair := range pairs { - parts := strings.SplitN(strings.TrimSpace(pair), "=", 2) - if len(parts) == 2 { - key := strings.TrimSpace(parts[0]) - val := strings.TrimSpace(parts[1]) - val = strings.Trim(val, "\"") - result[key] = val - } - } - return result -} - -func (hc *HTTPCollector) md5Hex(s string) string { - hash := md5.Sum([]byte(s)) - return hex.EncodeToString(hash[:]) -} - -func (hc *HTTPCollector) generateCnonce() string { - b := make([]byte, 8) - io.ReadFull(rand.Reader, b) - return hex.EncodeToString(b) -} - -func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol, targetURL string) (*http.Request, error) { - method := strings.ToUpper(config.Method) - if method == "" { - method = "GET" - } - - var body io.Reader - if config.Body != "" { - body = strings.NewReader(config.Body) - } - - req, err := http.NewRequest(method, targetURL, body) - if err != nil { - return nil, err - } - - for k, v := range config.Headers { - req.Header.Set(k, v) - } - - if config.Authorization != nil { - auth := config.Authorization - if auth.Type == "" || strings.EqualFold(auth.Type, AuthTypeBasic) { - if auth.BasicAuthUsername != "" && auth.BasicAuthPassword != "" { - req.SetBasicAuth(auth.BasicAuthUsername, auth.BasicAuthPassword) - } - } else if strings.EqualFold(auth.Type, AuthTypeBearer) { - if auth.BearerTokenToken != "" { - req.Header.Set("Authorization", "Bearer "+auth.BearerTokenToken) - } - } - } - - if len(config.Params) > 0 { - q := req.URL.Query() - for k, v := range config.Params { - q.Add(k, v) - } - req.URL.RawQuery = q.Encode() - } - - return req, nil -} - -// --- Parsers --- - -func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - var parser expfmt.TextParser - metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(body)) - if err != nil { - return nil, err - } - now := time.Now().UnixMilli() - for _, mf := range metricFamilies { - for _, m := range mf.Metric { - row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} - labels := make(map[string]string) - for _, pair := range m.Label { - labels[pair.GetName()] = pair.GetValue() - } - hasValue := false - for i, field := range metrics.AliasFields { - if field == constants.NULL_VALUE { - continue - } - if field == "value" || field == "prom_value" { - if m.Gauge != nil { - row.Columns[i] = fmt.Sprintf("%f", m.Gauge.GetValue()) - } else if m.Counter != nil { - row.Columns[i] = fmt.Sprintf("%f", m.Counter.GetValue()) - } else if m.Untyped != nil { - row.Columns[i] = fmt.Sprintf("%f", m.Untyped.GetValue()) - } - hasValue = true - } else { - if val, ok := labels[field]; ok { - row.Columns[i] = val - hasValue = true - } else { - row.Columns[i] = constants.NULL_VALUE - } - } - } - if hasValue { - response.Values = append(response.Values, row) - } - } - } - response.Time = now - return response, nil -} - -func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} - keyword := httpConfig.Keyword - keywordCount := 0 - if keyword != "" { - keywordCount = strings.Count(string(body), keyword) - } - for i, field := range metrics.AliasFields { - switch strings.ToLower(field) { - case strings.ToLower(constants.StatusCode): - row.Columns[i] = strconv.Itoa(statusCode) - case strings.ToLower(constants.RESPONSE_TIME): - row.Columns[i] = strconv.FormatInt(responseTime, 10) - case "keyword": - row.Columns[i] = strconv.Itoa(keywordCount) - default: - row.Columns[i] = constants.NULL_VALUE - } - } - response.Values = append(response.Values, row) - return response, nil -} - -func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} - for i, field := range metrics.AliasFields { - val := header.Get(field) - if val != "" { - row.Columns[i] = val - } else { - row.Columns[i] = constants.NULL_VALUE - } - } - response.Values = append(response.Values, row) - return response, nil -} - -func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, responseTime int64, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { - response := hc.createSuccessResponse(metrics) - var data interface{} - decoder := json.NewDecoder(bytes.NewReader(body)) - decoder.UseNumber() - if err := decoder.Decode(&data); err != nil { - return response, nil - } - parseScript := httpConfig.ParseScript - root := data - if parseScript != "" && parseScript != "$" { - root = hc.navigateJson(data, parseScript) - } - if root == nil { - return response, nil - } - if arr, ok := root.([]interface{}); ok { - for _, item := range arr { - row := hc.extractRow(item, metrics.AliasFields) - response.Values = append(response.Values, row) - } - } else { - row := hc.extractRow(root, metrics.AliasFields) - response.Values = append(response.Values, row) - } - return response, nil -} - -func (hc *HTTPCollector) navigateJson(data interface{}, path string) interface{} { - path = strings.TrimPrefix(path, "$.") - if path == "" { - return data - } - parts := strings.Split(path, ".") - current := data - for _, part := range parts { - if current == nil { - return nil - } - key := part - idx := -1 - if i := strings.Index(part, "["); i > -1 && strings.HasSuffix(part, "]") { - key = part[:i] - idxStr := part[i+1 : len(part)-1] - if val, err := strconv.Atoi(idxStr); err == nil { - idx = val - } - } - if key != "" { - if m, ok := current.(map[string]interface{}); ok { - if val, exists := m[key]; exists { - current = val - } else { - return nil - } - } else { - return nil - } - } - if idx > -1 { - if arr, ok := current.([]interface{}); ok { - if idx < len(arr) { - current = arr[idx] - } else { - return nil - } - } else { - return nil - } - } - } - return current -} - -func (hc *HTTPCollector) extractRow(item interface{}, fields []string) job.ValueRow { - row := job.ValueRow{Columns: make([]string, len(fields))} - m, isMap := item.(map[string]interface{}) - for i, field := range fields { - if strings.EqualFold(field, constants.RESPONSE_TIME) || strings.EqualFold(field, constants.StatusCode) { - row.Columns[i] = constants.NULL_VALUE - continue - } - var val interface{} - if isMap { - if v, ok := m[field]; ok { - val = v - } else { - val = hc.navigateJson(item, field) - } - } - if val != nil { - row.Columns[i] = fmt.Sprintf("%v", val) - } else { - row.Columns[i] = constants.NULL_VALUE - } - } - return row -} - -func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData, fields []string, responseTime int64, statusCode int) { - if resp == nil { - return - } - if len(resp.Fields) == 0 { - resp.Fields = make([]job.Field, len(fields)) - for i, f := range fields { - resp.Fields[i] = job.Field{Field: f, Type: constants.TYPE_STRING} - } - } - if len(resp.Values) == 0 { - row := job.ValueRow{Columns: make([]string, len(fields))} - for k := range row.Columns { - row.Columns[k] = constants.NULL_VALUE - } - resp.Values = append(resp.Values, row) - } - for i := range resp.Values { - for j, field := range fields { - if strings.EqualFold(field, constants.RESPONSE_TIME) { - resp.Values[i].Columns[j] = strconv.FormatInt(responseTime, 10) - } - if strings.EqualFold(field, constants.StatusCode) { - resp.Values[i].Columns[j] = strconv.Itoa(statusCode) - } - } - } -} - -func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration { - if timeoutStr == "" { - return 10 * time.Second - } - if val, err := strconv.Atoi(timeoutStr); err == nil { - return time.Duration(val) * time.Millisecond - } - return 10 * time.Second -} - -func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics) *job.CollectRepMetricsData { - return &job.CollectRepMetricsData{ - Metrics: metrics.Name, - Time: time.Now().UnixMilli(), - Code: constants.CollectSuccess, - Msg: "success", - Values: make([]job.ValueRow, 0), - } -} - -func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int, msg string) *job.CollectRepMetricsData { - return &job.CollectRepMetricsData{ - Metrics: metrics.Name, - Time: time.Now().UnixMilli(), - Code: code, - Msg: msg, - } -} diff --git a/internal/collector/common/transport/transport.go b/internal/collector/common/transport/transport.go deleted file mode 100644 index f2edf73..0000000 --- a/internal/collector/common/transport/transport.go +++ /dev/null @@ -1,277 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package transport - -import ( - "context" - "encoding/json" - "fmt" - "os" - - pb "hertzbeat.apache.org/hertzbeat-collector-go/api" - clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" - configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" -) - -const ( - // DefaultManagerAddr is the default manager server address (Java Netty default port) - DefaultManagerAddr = "127.0.0.1:1158" - // DefaultProtocol is the default communication protocol for Java compatibility - DefaultProtocol = "netty" - // DefaultMode is the default operation mode - DefaultMode = "public" - // DefaultIdentity is the default collector identity - DefaultIdentity = "collector-go" -) - -type Runner struct { - Config *configtypes.CollectorConfig - client transport.TransportClient - jobScheduler transport.JobScheduler - clrserver.Server -} - -func New(cfg *configtypes.CollectorConfig) *Runner { - return &Runner{ - Config: cfg, - Server: clrserver.Server{ - Logger: logger.Logger{}, // Will be initialized properly in Start method - }, - } -} - -// SetJobScheduler sets the job scheduler for the transport runner -func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) { - r.jobScheduler = scheduler -} - -// NewFromConfig creates a new transport runner from collector configuration -func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner { - if cfg == nil { - return nil - } - return New(cfg) -} - -// NewFromEnv creates a new transport runner from environment variables -func NewFromEnv() *Runner { - envLoader := config.NewEnvConfigLoader() - cfg := envLoader.LoadFromEnv() - return NewFromConfig(cfg) -} - -// NewFromUnifiedConfig creates a new transport runner using unified configuration loading -// It loads from file first, then overrides with environment variables -func NewFromUnifiedConfig(cfgPath string) (*Runner, error) { - unifiedLoader := config.NewUnifiedConfigLoader(cfgPath) - cfg, err := unifiedLoader.Load() - if err != nil { - return nil, err - } - return NewFromConfig(cfg), nil -} - -func (r *Runner) Start(ctx context.Context) error { - // 初始化 Logger 如果它还没有被设置 - if r.Logger.IsZero() { - r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) - } - r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) - r.Logger.Info("Starting transport client") - - // 构建 server 地址 - addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, r.Config.Collector.Manager.Port) - if addr == ":" { - // 如果配置为空,使用环境变量或默认值 - if v := os.Getenv("MANAGER_ADDR"); v != "" { - addr = v - } else { - addr = DefaultManagerAddr - } - } - - // 确定协议 - protocol := r.Config.Collector.Manager.Protocol - if protocol == "" { - if v := os.Getenv("MANAGER_PROTOCOL"); v != "" { - protocol = v - } else { - protocol = DefaultProtocol - } - } - - r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", protocol) - - // 创建客户端 - factory := &transport.TransportClientFactory{} - client, err := factory.CreateClient(protocol, addr) - if err != nil { - r.Logger.Error(err, "Failed to create transport client") - return err - } - - // Set the identity on the client if it supports it - identity := r.Config.Collector.Identity - if identity == "" { - identity = DefaultIdentity - } - - if nettyClient, ok := client.(*transport.NettyClient); ok { - nettyClient.SetIdentity(identity) - } - - r.client = client - - // 设置事件处理器 - switch c := client.(type) { - case *transport.GrpcClient: - c.SetEventHandler(func(event transport.Event) { - switch event.Type { - case transport.EventConnected: - r.Logger.Info("Connected to manager gRPC server", "addr", event.Address) - go r.sendOnlineMessage() - case transport.EventDisconnected: - r.Logger.Info("Disconnected from manager gRPC server", "addr", event.Address) - case transport.EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager gRPC server", "addr", event.Address) - } - }) - // Register processors with job scheduler - if r.jobScheduler != nil { - transport.RegisterDefaultProcessors(c, r.jobScheduler) - r.Logger.Info("Registered gRPC processors with job scheduler") - } else { - transport.RegisterDefaultProcessors(c, nil) - r.Logger.Info("Registered gRPC processors without job scheduler") - } - case *transport.NettyClient: - c.SetEventHandler(func(event transport.Event) { - switch event.Type { - case transport.EventConnected: - r.Logger.Info("Connected to manager netty server", "addr", event.Address) - go r.sendOnlineMessage() - case transport.EventDisconnected: - r.Logger.Info("Disconnected from manager netty server", "addr", event.Address) - case transport.EventConnectFailed: - r.Logger.Error(event.Error, "Failed to connect to manager netty server", "addr", event.Address) - } - }) - // Register processors with job scheduler - if r.jobScheduler != nil { - transport.RegisterDefaultNettyProcessors(c, r.jobScheduler) - r.Logger.Info("Registered netty processors with job scheduler") - } else { - transport.RegisterDefaultNettyProcessors(c, nil) - r.Logger.Info("Registered netty processors without job scheduler") - } - } - - if err := r.client.Start(); err != nil { - r.Logger.Error(err, "Failed to start transport client") - return err - } - - // 创建新的context用于监控关闭信号 - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // 监听 ctx.Done 优雅关闭 - go func() { - <-ctx.Done() - r.Logger.Info("Shutting down transport client...") - _ = r.client.Shutdown() - }() - - // 阻塞直到 ctx.Done - <-ctx.Done() - return nil -} - -func (r *Runner) sendOnlineMessage() { - if r.client != nil && r.client.IsStarted() { - // Use the configured identity - identity := r.Config.Collector.Identity - if identity == "" { - identity = DefaultIdentity - } - - // Create CollectorInfo JSON structure as expected by Java server - mode := r.Config.Collector.Mode - if mode == "" { - mode = DefaultMode // Default mode as in Java version - } - - collectorInfo := map[string]interface{}{ - "name": identity, - "ip": "", // Let server detect IP - "version": "1.0.0", - "mode": mode, - } - - // Convert to JSON bytes - jsonData, err := json.Marshal(collectorInfo) - if err != nil { - r.Logger.Error(err, "Failed to marshal collector info to JSON") - return - } - - onlineMsg := &pb.Message{ - Type: pb.MessageType_GO_ONLINE, - Direction: pb.Direction_REQUEST, - Identity: identity, - Msg: jsonData, - } - - r.Logger.Info("Sending online message", "identity", identity, "type", onlineMsg.Type) - - if err := r.client.SendMsg(onlineMsg); err != nil { - r.Logger.Error(err, "Failed to send online message", "identity", identity) - } else { - r.Logger.Info("Online message sent successfully", "identity", identity) - } - } -} - -func (r *Runner) Info() collector.Info { - return collector.Info{ - Name: "transport", - } -} - -func (r *Runner) Close() error { - r.Logger.Info("transport close...") - if r.client != nil { - _ = r.client.Shutdown() - } - return nil -} - -// GetClient returns the transport client (for testing and advanced usage) -func (r *Runner) GetClient() transport.TransportClient { - return r.client -} - -// IsConnected returns whether the client is connected and started -func (r *Runner) IsConnected() bool { - return r.client != nil && r.client.IsStarted() -} diff --git a/internal/collector/config/config.go b/internal/config/config.go similarity index 95% rename from internal/collector/config/config.go rename to internal/config/config.go index a5bafdb..a5a299f 100644 --- a/internal/collector/config/config.go +++ b/internal/config/config.go @@ -22,10 +22,10 @@ import ( "os" "gopkg.in/yaml.v3" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + collectortypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - collectortypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/config/config_factory.go b/internal/config/config_factory.go similarity index 98% rename from internal/collector/config/config_factory.go rename to internal/config/config_factory.go index 47b82b5..7854d4f 100644 --- a/internal/collector/config/config_factory.go +++ b/internal/config/config_factory.go @@ -25,8 +25,8 @@ import ( "strconv" "strings" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/config/config_test.go b/internal/config/config_test.go similarity index 100% rename from internal/collector/config/config_test.go rename to internal/config/config_test.go diff --git a/internal/collector/config/env_config.go b/internal/config/env_config.go similarity index 96% rename from internal/collector/config/env_config.go rename to internal/config/env_config.go index a81acf3..40c09c1 100644 --- a/internal/collector/config/env_config.go +++ b/internal/config/env_config.go @@ -20,8 +20,8 @@ package config import ( "os" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/config/unified_config.go b/internal/config/unified_config.go similarity index 97% rename from internal/collector/config/unified_config.go rename to internal/config/unified_config.go index bd0fde5..2640448 100644 --- a/internal/collector/config/unified_config.go +++ b/internal/config/unified_config.go @@ -20,8 +20,8 @@ package config import ( "os" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/basic/dns/.keep b/internal/job/cache/.keep similarity index 100% rename from internal/collector/basic/dns/.keep rename to internal/job/cache/.keep diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go b/internal/job/collect/dispatch/metrics_collector.go similarity index 98% rename from internal/collector/common/collect/dispatch/metrics_collector.go rename to internal/job/collect/dispatch/metrics_collector.go index 7261ebe..4074275 100644 --- a/internal/collector/common/collect/dispatch/metrics_collector.go +++ b/internal/job/collect/dispatch/metrics_collector.go @@ -29,9 +29,9 @@ import ( "github.com/expr-lang/expr" "github.com/expr-lang/expr/vm" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/unit" @@ -503,7 +503,7 @@ func formatNumber(value float64) string { // CollectMetrics collects metrics using the appropriate collector and returns results via channel // This method uses Go's channel-based concurrency instead of Java's thread pools -func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData { +func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job *jobtypes.Job, _ *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData { resultChan := make(chan *jobtypes.CollectRepMetricsData, 1) // Start collection in a goroutine (Go's lightweight thread) diff --git a/internal/collector/common/collect/dispatch/metrics_collector_test.go b/internal/job/collect/dispatch/metrics_collector_test.go similarity index 99% rename from internal/collector/common/collect/dispatch/metrics_collector_test.go rename to internal/job/collect/dispatch/metrics_collector_test.go index bae124d..581de71 100644 --- a/internal/collector/common/collect/dispatch/metrics_collector_test.go +++ b/internal/job/collect/dispatch/metrics_collector_test.go @@ -22,9 +22,9 @@ package dispatch import ( "testing" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/common/collect/lazy_message_router.go b/internal/job/collect/lazy_message_router.go similarity index 99% rename from internal/collector/common/collect/lazy_message_router.go rename to internal/job/collect/lazy_message_router.go index a038099..9613fb0 100644 --- a/internal/collector/common/collect/lazy_message_router.go +++ b/internal/job/collect/lazy_message_router.go @@ -30,8 +30,8 @@ import ( "github.com/apache/arrow/go/v13/arrow/memory" pb "hertzbeat.apache.org/hertzbeat-collector-go/api" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go b/internal/job/collect/metrics/hertzbeat_metrics_collector.go similarity index 100% rename from internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go rename to internal/job/collect/metrics/hertzbeat_metrics_collector.go diff --git a/internal/collector/common/collect/result_handler.go b/internal/job/collect/result_handler.go similarity index 90% rename from internal/collector/common/collect/result_handler.go rename to internal/job/collect/result_handler.go index cd16820..1592e6c 100644 --- a/internal/collector/common/collect/result_handler.go +++ b/internal/job/collect/result_handler.go @@ -22,8 +22,8 @@ package collect import ( "fmt" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -35,7 +35,7 @@ type ResultHandlerImpl struct { // ResultHandler interface for handling collection results type ResultHandler interface { - HandleCollectData(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error + HandleCollectData(data *job.CollectRepMetricsData, job *job.Job) error } // NewResultHandler creates a new result handler @@ -47,7 +47,7 @@ func NewResultHandler(logger logger.Logger, messageRouter MessageRouter) ResultH } // HandleCollectData processes the collection results -func (rh *ResultHandlerImpl) HandleCollectData(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error { +func (rh *ResultHandlerImpl) HandleCollectData(data *job.CollectRepMetricsData, job *job.Job) error { if data == nil { rh.logger.Error(nil, "collect data is nil") return fmt.Errorf("collect data is nil") diff --git a/internal/collector/common/collect/strategy/strategy_factory.go b/internal/job/collect/strategy/strategy_factory.go similarity index 99% rename from internal/collector/common/collect/strategy/strategy_factory.go rename to internal/job/collect/strategy/strategy_factory.go index 9fc9af1..a118aa4 100644 --- a/internal/collector/common/collect/strategy/strategy_factory.go +++ b/internal/job/collect/strategy/strategy_factory.go @@ -21,7 +21,7 @@ import ( "fmt" "sync" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/common/dispatcher/common_dispatcher.go b/internal/job/dispatcher/common_dispatcher.go similarity index 98% rename from internal/collector/common/dispatcher/common_dispatcher.go rename to internal/job/dispatcher/common_dispatcher.go index 288e0c1..eb6afcc 100644 --- a/internal/collector/common/dispatcher/common_dispatcher.go +++ b/internal/job/dispatcher/common_dispatcher.go @@ -25,8 +25,8 @@ import ( "sync" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" ) diff --git a/internal/collector/basic/ftp/.keep b/internal/job/dispatcher/entrance/.keep similarity index 100% rename from internal/collector/basic/ftp/.keep rename to internal/job/dispatcher/entrance/.keep diff --git a/internal/collector/basic/icmp/.keep b/internal/job/dispatcher/exporter/.keep similarity index 100% rename from internal/collector/basic/icmp/.keep rename to internal/job/dispatcher/exporter/.keep diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go b/internal/job/dispatcher/hashed_wheel_timer.go similarity index 99% rename from internal/collector/common/dispatcher/hashed_wheel_timer.go rename to internal/job/dispatcher/hashed_wheel_timer.go index 9100205..c65047d 100644 --- a/internal/collector/common/dispatcher/hashed_wheel_timer.go +++ b/internal/job/dispatcher/hashed_wheel_timer.go @@ -25,7 +25,7 @@ import ( "sync/atomic" "time" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/common/dispatcher/time_wheel.go b/internal/job/dispatcher/time_wheel.go similarity index 95% rename from internal/collector/common/dispatcher/time_wheel.go rename to internal/job/dispatcher/time_wheel.go index d8c7aa1..a93df77 100644 --- a/internal/collector/common/dispatcher/time_wheel.go +++ b/internal/job/dispatcher/time_wheel.go @@ -25,14 +25,14 @@ import ( "sync" "time" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) // JobInfo holds information about a scheduled job type JobInfo struct { - Job *jobtypes.Job - Timeout *jobtypes.Timeout + Job *job.Job + Timeout *job.Timeout CreatedAt time.Time LastExecutedAt time.Time NextExecutionAt time.Time @@ -70,12 +70,12 @@ type DispatcherStats struct { // MetricsTaskDispatcher interface for metrics task dispatching type MetricsTaskDispatcher interface { - DispatchMetricsTask(ctx context.Context, job *jobtypes.Job, timeout *jobtypes.Timeout) error + DispatchMetricsTask(ctx context.Context, job *job.Job, timeout *job.Timeout) error } // HashedWheelTimer interface for time wheel operations type HashedWheelTimer interface { - NewTimeout(task jobtypes.TimerTask, delay time.Duration) *jobtypes.Timeout + NewTimeout(task job.TimerTask, delay time.Duration) *job.Timeout Start(ctx context.Context) error Stop() error GetStats() map[string]interface{} @@ -101,7 +101,7 @@ func NewTimeDispatch(logger logger.Logger, commonDispatcher MetricsTaskDispatche // AddJob adds a job to the time-based scheduler // This corresponds to Java's TimerDispatcher.addJob method -func (td *TimeDispatch) AddJob(job *jobtypes.Job) error { +func (td *TimeDispatch) AddJob(job *job.Job) error { if job == nil { return fmt.Errorf("job cannot be nil") } @@ -172,7 +172,7 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error { // Try to remove from cyclic tasks if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); exists { - if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { + if timeout, ok := timeoutInterface.(*job.Timeout); ok { timeout.Cancel() td.jobInfos.Delete(jobID) td.logger.V(1).Info("removed cyclic job", "jobID", jobID) @@ -182,7 +182,7 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error { // Try to remove from temp tasks if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); exists { - if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok { + if timeout, ok := timeoutInterface.(*job.Timeout); ok { timeout.Cancel() td.jobInfos.Delete(jobID) td.logger.V(1).Info("removed one-time job", "jobID", jobID) @@ -233,14 +233,14 @@ func (td *TimeDispatch) Stop() error { // Cancel all running tasks td.cyclicTasks.Range(func(key, value interface{}) bool { - if timeout, ok := value.(*jobtypes.Timeout); ok { + if timeout, ok := value.(*job.Timeout); ok { timeout.Cancel() } return true }) td.tempTasks.Range(func(key, value interface{}) bool { - if timeout, ok := value.(*jobtypes.Timeout); ok { + if timeout, ok := value.(*job.Timeout); ok { timeout.Cancel() } return true @@ -339,7 +339,7 @@ func (td *TimeDispatch) UpdateJobExecution(jobID int64) { } // RescheduleJob reschedules a cyclic job for the next execution -func (td *TimeDispatch) RescheduleJob(job *jobtypes.Job) error { +func (td *TimeDispatch) RescheduleJob(job *job.Job) error { if !job.IsCyclic || job.DefaultInterval <= 0 { return fmt.Errorf("job is not cyclable or has invalid interval") } diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go b/internal/job/dispatcher/wheel_timer_task.go similarity index 87% rename from internal/collector/common/dispatcher/wheel_timer_task.go rename to internal/job/dispatcher/wheel_timer_task.go index 657103a..09bfb18 100644 --- a/internal/collector/common/dispatcher/wheel_timer_task.go +++ b/internal/job/dispatcher/wheel_timer_task.go @@ -24,7 +24,7 @@ import ( "fmt" "time" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -34,20 +34,20 @@ type StatsRecorder interface { RecordJobCompleted() RecordJobFailed() UpdateJobExecution(jobID int64) - RescheduleJob(job *jobtypes.Job) error + RescheduleJob(job *job.Job) error } // WheelTimerTask represents a task that runs in the time wheel // This corresponds to Java's WheelTimerTask type WheelTimerTask struct { - job *jobtypes.Job + job *job.Job commonDispatcher MetricsTaskDispatcher statsRecorder StatsRecorder logger logger.Logger } // NewWheelTimerTask creates a new wheel timer task -func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger) *WheelTimerTask { +func NewWheelTimerTask(job *job.Job, commonDispatcher MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger) *WheelTimerTask { return &WheelTimerTask{ job: job, commonDispatcher: commonDispatcher, @@ -58,7 +58,7 @@ func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher MetricsTaskDispatcher // Run executes the timer task // This corresponds to Java's WheelTimerTask.run method -func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { +func (wtt *WheelTimerTask) Run(timeout *job.Timeout) error { if wtt.job == nil { wtt.logger.Error(nil, "job is nil, cannot execute") return fmt.Errorf("job is nil, cannot execute") @@ -116,7 +116,7 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error { } // rescheduleJob reschedules a cyclic job for the next execution -func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) { +func (wtt *WheelTimerTask) rescheduleJob(timeout *job.Timeout) { if wtt.job.DefaultInterval <= 0 { wtt.logger.Info("job has no valid interval, not rescheduling", "jobID", wtt.job.ID) @@ -134,6 +134,6 @@ func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) { } // GetJob returns the associated job -func (wtt *WheelTimerTask) GetJob() *jobtypes.Job { +func (wtt *WheelTimerTask) GetJob() *job.Job { return wtt.job } diff --git a/internal/collector/common/router/message_router.go b/internal/job/router/message_router.go similarity index 93% rename from internal/collector/common/router/message_router.go rename to internal/job/router/message_router.go index e3093ee..e03d9c2 100644 --- a/internal/collector/common/router/message_router.go +++ b/internal/job/router/message_router.go @@ -24,9 +24,9 @@ import ( "fmt" pb "hertzbeat.apache.org/hertzbeat-collector-go/api" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server" "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport" + job2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -50,7 +50,7 @@ type MessageRouter interface { RegisterProcessors() // SendResult sends collection results back to the manager - SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error + SendResult(data *job2.CollectRepMetricsData, job *job2.Job) error // GetIdentity returns the collector identity GetIdentity() string @@ -60,7 +60,7 @@ type MessageRouter interface { type MessageRouterImpl struct { logger logger.Logger client transport.TransportClient - jobRunner *job.Runner + jobRunner *server.Runner identity string } @@ -68,7 +68,7 @@ type MessageRouterImpl struct { type Config struct { Logger logger.Logger Client transport.TransportClient - JobRunner *job.Runner + JobRunner *server.Runner Identity string } @@ -122,7 +122,7 @@ func (r *MessageRouterImpl) handleIssueCyclicTask(msg *pb.Message) (*pb.Message, r.logger.Info("Handling cyclic task message") // Parse job from message - var job jobtypes.Job + var job job2.Job if err := json.Unmarshal(msg.Msg, &job); err != nil { r.logger.Error(err, "failed to unmarshal job") return &pb.Message{ @@ -189,7 +189,7 @@ func (r *MessageRouterImpl) handleIssueOneTimeTask(msg *pb.Message) (*pb.Message r.logger.Info("Handling one-time task message") // Parse job from message - var job jobtypes.Job + var job job2.Job if err := json.Unmarshal(msg.Msg, &job); err != nil { r.logger.Error(err, "failed to unmarshal job") return &pb.Message{ @@ -223,7 +223,7 @@ func (r *MessageRouterImpl) handleIssueOneTimeTask(msg *pb.Message) (*pb.Message } // SendResult sends collection results back to the manager -func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) error { +func (r *MessageRouterImpl) SendResult(data *job2.CollectRepMetricsData, job *job2.Job) error { if r.client == nil || !r.client.IsStarted() { return fmt.Errorf("transport client not started") } @@ -237,7 +237,7 @@ func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData, job } // Serialize metrics data - dataBytes, err := json.Marshal([]jobtypes.CollectRepMetricsData{*data}) + dataBytes, err := json.Marshal([]job2.CollectRepMetricsData{*data}) if err != nil { return fmt.Errorf("failed to marshal metrics data: %w", err) } diff --git a/internal/collector/common/job/job_server.go b/internal/job/server/job_server.go similarity index 92% rename from internal/collector/common/job/job_server.go rename to internal/job/server/job_server.go index 4451ffa..0a47a0e 100644 --- a/internal/collector/common/job/job_server.go +++ b/internal/job/server/job_server.go @@ -17,7 +17,7 @@ * under the License. */ -package job +package server import ( "context" @@ -25,13 +25,13 @@ import ( "fmt" "sync" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher" - clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/dispatch" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/dispatcher" + clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" ) // TimeDispatcher interface defines time-based job scheduling diff --git a/internal/collector/common/types/collector/collector_types.go b/internal/metrics/metrics.go similarity index 91% copy from internal/collector/common/types/collector/collector_types.go copy to internal/metrics/metrics.go index 4ab91e4..5e29267 100644 --- a/internal/collector/common/types/collector/collector_types.go +++ b/internal/metrics/metrics.go @@ -17,8 +17,4 @@ * under the License. */ -package collector - -type Info struct { - Name string `json:"name" yaml:"name"` -} +package metrics diff --git a/internal/collector/basic/database/jdbc_collector.go b/internal/protocol/basic/database/jdbc_collector.go similarity index 98% rename from internal/collector/basic/database/jdbc_collector.go rename to internal/protocol/basic/database/jdbc_collector.go index ec8f179..a84c6ac 100644 --- a/internal/collector/basic/database/jdbc_collector.go +++ b/internal/protocol/basic/database/jdbc_collector.go @@ -38,10 +38,10 @@ import ( _ "github.com/microsoft/go-mssqldb" _ "github.com/sijms/go-ora/v2" "golang.org/x/crypto/ssh" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + protocol2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" @@ -192,7 +192,7 @@ type sshTunnelHelper struct { // startSSHTunnel starts an SSH tunnel // It connects to the SSH bastion, listens on a local random port, // and forwards traffic to the target database. -func startSSHTunnel(config *protocol.SSHTunnel, remoteHost, remotePort string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, error) { +func startSSHTunnel(config *protocol2.SSHTunnel, remoteHost, remotePort string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, error) { sshHost := config.Host sshPort, err := strconv.Atoi(config.Port) if err != nil { @@ -322,7 +322,7 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector { } // extractJDBCConfig extracts JDBC configuration from interface{} type -func extractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCProtocol, error) { +func extractJDBCConfig(jdbcInterface interface{}) (*protocol2.JDBCProtocol, error) { replacer := param.NewReplacer() return replacer.ExtractJDBCConfig(jdbcInterface) } @@ -404,7 +404,7 @@ func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error { } // checkTunnelParam validates SSH tunnel configuration -func (jc *JDBCCollector) checkTunnelParam(config *protocol.SSHTunnel) error { +func (jc *JDBCCollector) checkTunnelParam(config *protocol2.SSHTunnel) error { if config == nil { return nil } @@ -610,7 +610,7 @@ func validateURL(rawURL string, platform string) error { } // constructDatabaseURL constructs the database connection URL/DSN -func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol.JDBCProtocol, host, port string) (string, error) { +func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol2.JDBCProtocol, host, port string) (string, error) { // 1. If user provided a full URL, use it (already validated in PreCheck) if jdbc.URL != "" { // Validate again to prevent bypassing PreCheck diff --git a/internal/collector/basic/database/jdbc_collector_test.go b/internal/protocol/basic/database/jdbc_collector_test.go similarity index 100% rename from internal/collector/basic/database/jdbc_collector_test.go rename to internal/protocol/basic/database/jdbc_collector_test.go diff --git a/internal/collector/basic/imap/.keep b/internal/protocol/basic/dns/.keep similarity index 100% rename from internal/collector/basic/imap/.keep rename to internal/protocol/basic/dns/.keep diff --git a/internal/collector/basic/ipmi2/.keep b/internal/protocol/basic/ftp/.keep similarity index 100% rename from internal/collector/basic/ipmi2/.keep rename to internal/protocol/basic/ftp/.keep diff --git a/internal/protocol/basic/http/http_collector.go b/internal/protocol/basic/http/http_collector.go new file mode 100644 index 0000000..11a8b9b --- /dev/null +++ b/internal/protocol/basic/http/http_collector.go @@ -0,0 +1,596 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package http + +import ( + "bytes" + "crypto/md5" + "crypto/rand" + "crypto/tls" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/prometheus/common/expfmt" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +func init() { + strategy.RegisterFactory(ProtocolHTTP, func(logger logger.Logger) strategy.Collector { + return NewHTTPCollector(logger) + }) +} + +const ( + ProtocolHTTP = "http" + + // Parse Types + ParseTypeDefault = "default" + ParseTypeJsonPath = "jsonPath" + ParseTypePrometheus = "prometheus" + ParseTypeWebsite = "website" + ParseTypeHeader = "header" + + // Auth Types + AuthTypeBasic = "Basic Auth" + AuthTypeBearer = "Bearer Token" + AuthTypeDigest = "Digest Auth" +) + +type HTTPCollector struct { + logger logger.Logger +} + +func NewHTTPCollector(log logger.Logger) *HTTPCollector { + return &HTTPCollector{ + logger: log.WithName("http-collector"), + } +} + +func (hc *HTTPCollector) Protocol() string { + return ProtocolHTTP +} + +func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error { + if metrics == nil || metrics.HTTP == nil { + return fmt.Errorf("http configuration is missing") + } + return nil +} + +func (hc *HTTPCollector) Collect(metrics *job.Metrics) *job.CollectRepMetricsData { + start := time.Now() + + // MetricsCollector has already performed parameter replacement on metrics.HTTP + httpConfig := metrics.HTTP + + // 1. Prepare URL + targetURL := httpConfig.URL + + // Parse SSL bool string + isSSL := false + if httpConfig.SSL != "" { + if val, err := strconv.ParseBool(httpConfig.SSL); err == nil { + isSSL = val + } + } + + if targetURL == "" { + schema := "http" + if isSSL { + schema = "https" + } + // Use metrics.ConfigMap values if URL is empty + // Note: metrics.ConfigMap is already populated with job.Configmap values + host := metrics.ConfigMap["host"] + port := metrics.ConfigMap["port"] + if host != "" && port != "" { + targetURL = fmt.Sprintf("%s://%s:%s", schema, host, port) + } + } + + if targetURL != "" && !strings.HasPrefix(targetURL, "http") { + schema := "http" + if isSSL { + schema = "https" + } + targetURL = fmt.Sprintf("%s://%s", schema, targetURL) + } + + if targetURL == "" { + return hc.createFailResponse(metrics, constants.CollectFail, "target URL is empty") + } + + // 2. Create Request + req, err := hc.createRequest(httpConfig, targetURL) + if err != nil { + return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to create request: %v", err)) + } + + // 3. Create Client + timeoutStr := metrics.Timeout + if httpConfig.Timeout != "" { + timeoutStr = httpConfig.Timeout + } + + timeout := hc.getTimeout(timeoutStr) + client := &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + + // 4. Execute Request (with Digest retry logic) + resp, err := client.Do(req) + + // Handle Digest Auth Challenge (401) + if err == nil && resp.StatusCode == http.StatusUnauthorized && + httpConfig.Authorization != nil && + strings.EqualFold(httpConfig.Authorization.Type, AuthTypeDigest) { + + // Close the first response body + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + authHeader := resp.Header.Get("WWW-Authenticate") + if authHeader != "" { + // Create new request with Authorization header + digestReq, digestErr := hc.handleDigestAuth(req, authHeader, httpConfig.Authorization) + if digestErr == nil { + // Retry request + resp, err = client.Do(digestReq) + } else { + hc.logger.Error(digestErr, "failed to handle digest auth") + } + } + } + + if err != nil { + return hc.createFailResponse(metrics, constants.CollectUnReachable, fmt.Sprintf("request failed: %v", err)) + } + defer resp.Body.Close() + + responseTime := time.Since(start).Milliseconds() + + // 5. Parse Response + parseType := httpConfig.ParseType + if parseType == "" { + parseType = ParseTypeDefault + } + + var responseData *job.CollectRepMetricsData + + // Read body + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("failed to read body: %v", err)) + } + + switch parseType { + case ParseTypePrometheus: + responseData, err = hc.parsePrometheus(bodyBytes, metrics) + case ParseTypeWebsite: + responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode, responseTime, metrics, httpConfig) + case ParseTypeHeader: + responseData, err = hc.parseHeader(resp.Header, metrics) + case ParseTypeJsonPath, ParseTypeDefault: + responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) + default: + responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, httpConfig) + } + + if err != nil { + hc.logger.Error(err, "parse response failed", "type", parseType) + return hc.createFailResponse(metrics, constants.CollectFail, fmt.Sprintf("parse error: %v", err)) + } + + hc.fillCommonFields(responseData, metrics.AliasFields, responseTime, resp.StatusCode) + + return responseData +} + +// handleDigestAuth generates a new request with the Digest Authorization header +func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, authHeader string, authConfig *protocol.Authorization) (*http.Request, error) { + params := hc.parseAuthHeader(authHeader) + realm := params["realm"] + nonce := params["nonce"] + qop := params["qop"] + opaque := params["opaque"] + algorithm := params["algorithm"] + if algorithm == "" { + algorithm = "MD5" + } + + if realm == "" || nonce == "" { + return nil, fmt.Errorf("missing realm or nonce in digest header") + } + + username := authConfig.DigestAuthUsername + password := authConfig.DigestAuthPassword + ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password)) + + method := originalReq.Method + uri := originalReq.URL.RequestURI() + ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri)) + + nc := "00000001" + cnonce := hc.generateCnonce() + + var responseStr string + if qop == "" { + responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce, ha2)) + } else if strings.Contains(qop, "auth") { + responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc, cnonce, "auth", ha2)) + } else { + return nil, fmt.Errorf("unsupported qop: %s", qop) + } + + authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", uri="%s", response="%s", algorithm="%s"`, + username, realm, nonce, uri, responseStr, algorithm) + + if opaque != "" { + authVal += fmt.Sprintf(`, opaque="%s"`, opaque) + } + if qop != "" { + authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc, cnonce) + } + + var newReq *http.Request + if originalReq.GetBody != nil { + body, _ := originalReq.GetBody() + newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), body) + } else { + newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), nil) + } + + for k, v := range originalReq.Header { + newReq.Header[k] = v + } + + newReq.Header.Set("Authorization", authVal) + return newReq, nil +} + +func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string { + header = strings.TrimPrefix(header, "Digest ") + result := make(map[string]string) + + pairs := strings.Split(header, ",") + for _, pair := range pairs { + parts := strings.SplitN(strings.TrimSpace(pair), "=", 2) + if len(parts) == 2 { + key := strings.TrimSpace(parts[0]) + val := strings.TrimSpace(parts[1]) + val = strings.Trim(val, "\"") + result[key] = val + } + } + return result +} + +func (hc *HTTPCollector) md5Hex(s string) string { + hash := md5.Sum([]byte(s)) + return hex.EncodeToString(hash[:]) +} + +func (hc *HTTPCollector) generateCnonce() string { + b := make([]byte, 8) + io.ReadFull(rand.Reader, b) + return hex.EncodeToString(b) +} + +func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol, targetURL string) (*http.Request, error) { + method := strings.ToUpper(config.Method) + if method == "" { + method = "GET" + } + + var body io.Reader + if config.Body != "" { + body = strings.NewReader(config.Body) + } + + req, err := http.NewRequest(method, targetURL, body) + if err != nil { + return nil, err + } + + for k, v := range config.Headers { + req.Header.Set(k, v) + } + + if config.Authorization != nil { + auth := config.Authorization + if auth.Type == "" || strings.EqualFold(auth.Type, AuthTypeBasic) { + if auth.BasicAuthUsername != "" && auth.BasicAuthPassword != "" { + req.SetBasicAuth(auth.BasicAuthUsername, auth.BasicAuthPassword) + } + } else if strings.EqualFold(auth.Type, AuthTypeBearer) { + if auth.BearerTokenToken != "" { + req.Header.Set("Authorization", "Bearer "+auth.BearerTokenToken) + } + } + } + + if len(config.Params) > 0 { + q := req.URL.Query() + for k, v := range config.Params { + q.Add(k, v) + } + req.URL.RawQuery = q.Encode() + } + + return req, nil +} + +// --- Parsers --- + +func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics) (*job.CollectRepMetricsData, error) { + response := hc.createSuccessResponse(metrics) + var parser expfmt.TextParser + metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(body)) + if err != nil { + return nil, err + } + now := time.Now().UnixMilli() + for _, mf := range metricFamilies { + for _, m := range mf.Metric { + row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} + labels := make(map[string]string) + for _, pair := range m.Label { + labels[pair.GetName()] = pair.GetValue() + } + hasValue := false + for i, field := range metrics.AliasFields { + if field == constants.NULL_VALUE { + continue + } + if field == "value" || field == "prom_value" { + if m.Gauge != nil { + row.Columns[i] = fmt.Sprintf("%f", m.Gauge.GetValue()) + } else if m.Counter != nil { + row.Columns[i] = fmt.Sprintf("%f", m.Counter.GetValue()) + } else if m.Untyped != nil { + row.Columns[i] = fmt.Sprintf("%f", m.Untyped.GetValue()) + } + hasValue = true + } else { + if val, ok := labels[field]; ok { + row.Columns[i] = val + hasValue = true + } else { + row.Columns[i] = constants.NULL_VALUE + } + } + } + if hasValue { + response.Values = append(response.Values, row) + } + } + } + response.Time = now + return response, nil +} + +func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { + response := hc.createSuccessResponse(metrics) + row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} + keyword := httpConfig.Keyword + keywordCount := 0 + if keyword != "" { + keywordCount = strings.Count(string(body), keyword) + } + for i, field := range metrics.AliasFields { + switch strings.ToLower(field) { + case strings.ToLower(constants.StatusCode): + row.Columns[i] = strconv.Itoa(statusCode) + case strings.ToLower(constants.RESPONSE_TIME): + row.Columns[i] = strconv.FormatInt(responseTime, 10) + case "keyword": + row.Columns[i] = strconv.Itoa(keywordCount) + default: + row.Columns[i] = constants.NULL_VALUE + } + } + response.Values = append(response.Values, row) + return response, nil +} + +func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics) (*job.CollectRepMetricsData, error) { + response := hc.createSuccessResponse(metrics) + row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))} + for i, field := range metrics.AliasFields { + val := header.Get(field) + if val != "" { + row.Columns[i] = val + } else { + row.Columns[i] = constants.NULL_VALUE + } + } + response.Values = append(response.Values, row) + return response, nil +} + +func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, responseTime int64, httpConfig *protocol.HTTPProtocol) (*job.CollectRepMetricsData, error) { + response := hc.createSuccessResponse(metrics) + var data interface{} + decoder := json.NewDecoder(bytes.NewReader(body)) + decoder.UseNumber() + if err := decoder.Decode(&data); err != nil { + return response, nil + } + parseScript := httpConfig.ParseScript + root := data + if parseScript != "" && parseScript != "$" { + root = hc.navigateJson(data, parseScript) + } + if root == nil { + return response, nil + } + if arr, ok := root.([]interface{}); ok { + for _, item := range arr { + row := hc.extractRow(item, metrics.AliasFields) + response.Values = append(response.Values, row) + } + } else { + row := hc.extractRow(root, metrics.AliasFields) + response.Values = append(response.Values, row) + } + return response, nil +} + +func (hc *HTTPCollector) navigateJson(data interface{}, path string) interface{} { + path = strings.TrimPrefix(path, "$.") + if path == "" { + return data + } + parts := strings.Split(path, ".") + current := data + for _, part := range parts { + if current == nil { + return nil + } + key := part + idx := -1 + if i := strings.Index(part, "["); i > -1 && strings.HasSuffix(part, "]") { + key = part[:i] + idxStr := part[i+1 : len(part)-1] + if val, err := strconv.Atoi(idxStr); err == nil { + idx = val + } + } + if key != "" { + if m, ok := current.(map[string]interface{}); ok { + if val, exists := m[key]; exists { + current = val + } else { + return nil + } + } else { + return nil + } + } + if idx > -1 { + if arr, ok := current.([]interface{}); ok { + if idx < len(arr) { + current = arr[idx] + } else { + return nil + } + } else { + return nil + } + } + } + return current +} + +func (hc *HTTPCollector) extractRow(item interface{}, fields []string) job.ValueRow { + row := job.ValueRow{Columns: make([]string, len(fields))} + m, isMap := item.(map[string]interface{}) + for i, field := range fields { + if strings.EqualFold(field, constants.RESPONSE_TIME) || strings.EqualFold(field, constants.StatusCode) { + row.Columns[i] = constants.NULL_VALUE + continue + } + var val interface{} + if isMap { + if v, ok := m[field]; ok { + val = v + } else { + val = hc.navigateJson(item, field) + } + } + if val != nil { + row.Columns[i] = fmt.Sprintf("%v", val) + } else { + row.Columns[i] = constants.NULL_VALUE + } + } + return row +} + +func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData, fields []string, responseTime int64, statusCode int) { + if resp == nil { + return + } + if len(resp.Fields) == 0 { + resp.Fields = make([]job.Field, len(fields)) + for i, f := range fields { + resp.Fields[i] = job.Field{Field: f, Type: constants.TYPE_STRING} + } + } + if len(resp.Values) == 0 { + row := job.ValueRow{Columns: make([]string, len(fields))} + for k := range row.Columns { + row.Columns[k] = constants.NULL_VALUE + } + resp.Values = append(resp.Values, row) + } + for i := range resp.Values { + for j, field := range fields { + if strings.EqualFold(field, constants.RESPONSE_TIME) { + resp.Values[i].Columns[j] = strconv.FormatInt(responseTime, 10) + } + if strings.EqualFold(field, constants.StatusCode) { + resp.Values[i].Columns[j] = strconv.Itoa(statusCode) + } + } + } +} + +func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration { + if timeoutStr == "" { + return 10 * time.Second + } + if val, err := strconv.Atoi(timeoutStr); err == nil { + return time.Duration(val) * time.Millisecond + } + return 10 * time.Second +} + +func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics) *job.CollectRepMetricsData { + return &job.CollectRepMetricsData{ + Metrics: metrics.Name, + Time: time.Now().UnixMilli(), + Code: constants.CollectSuccess, + Msg: "success", + Values: make([]job.ValueRow, 0), + } +} + +func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int, msg string) *job.CollectRepMetricsData { + return &job.CollectRepMetricsData{ + Metrics: metrics.Name, + Time: time.Now().UnixMilli(), + Code: code, + Msg: msg, + } +} diff --git a/internal/collector/basic/memcached/.keep b/internal/protocol/basic/icmp/.keep similarity index 100% rename from internal/collector/basic/memcached/.keep rename to internal/protocol/basic/icmp/.keep diff --git a/internal/collector/basic/modbus/.keep b/internal/protocol/basic/imap/.keep similarity index 100% rename from internal/collector/basic/modbus/.keep rename to internal/protocol/basic/imap/.keep diff --git a/internal/collector/basic/mqtt/.keep b/internal/protocol/basic/ipmi2/.keep similarity index 100% rename from internal/collector/basic/mqtt/.keep rename to internal/protocol/basic/ipmi2/.keep diff --git a/internal/collector/basic/nginx/.keep b/internal/protocol/basic/memcached/.keep similarity index 100% rename from internal/collector/basic/nginx/.keep rename to internal/protocol/basic/memcached/.keep diff --git a/internal/collector/basic/ntp/.keep b/internal/protocol/basic/modbus/.keep similarity index 100% rename from internal/collector/basic/ntp/.keep rename to internal/protocol/basic/modbus/.keep diff --git a/internal/collector/basic/plc/.keep b/internal/protocol/basic/mqtt/.keep similarity index 100% rename from internal/collector/basic/plc/.keep rename to internal/protocol/basic/mqtt/.keep diff --git a/internal/collector/basic/pop3/.keep b/internal/protocol/basic/nginx/.keep similarity index 100% rename from internal/collector/basic/pop3/.keep rename to internal/protocol/basic/nginx/.keep diff --git a/internal/collector/basic/prometheus/.keep b/internal/protocol/basic/ntp/.keep similarity index 100% rename from internal/collector/basic/prometheus/.keep rename to internal/protocol/basic/ntp/.keep diff --git a/internal/collector/basic/push/.keep b/internal/protocol/basic/plc/.keep similarity index 100% rename from internal/collector/basic/push/.keep rename to internal/protocol/basic/plc/.keep diff --git a/internal/collector/basic/redfish/.keep b/internal/protocol/basic/pop3/.keep similarity index 100% rename from internal/collector/basic/redfish/.keep rename to internal/protocol/basic/pop3/.keep diff --git a/internal/collector/basic/s7/.keep b/internal/protocol/basic/prometheus/.keep similarity index 100% rename from internal/collector/basic/s7/.keep rename to internal/protocol/basic/prometheus/.keep diff --git a/internal/collector/basic/script/.keep b/internal/protocol/basic/push/.keep similarity index 100% rename from internal/collector/basic/script/.keep rename to internal/protocol/basic/push/.keep diff --git a/internal/collector/basic/sd/.keep b/internal/protocol/basic/redfish/.keep similarity index 100% rename from internal/collector/basic/sd/.keep rename to internal/protocol/basic/redfish/.keep diff --git a/internal/collector/basic/redis/redis_collector.go b/internal/protocol/basic/redis/redis_collector.go similarity index 93% rename from internal/collector/basic/redis/redis_collector.go rename to internal/protocol/basic/redis/redis_collector.go index 9b23741..8b592f4 100644 --- a/internal/collector/basic/redis/redis_collector.go +++ b/internal/protocol/basic/redis/redis_collector.go @@ -30,10 +30,10 @@ import ( "github.com/redis/go-redis/v9" "golang.org/x/crypto/ssh" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + protocol2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh" @@ -143,7 +143,7 @@ func (rc *RedisCollector) Collect(metrics *jobtypes.Metrics) *jobtypes.CollectRe return response } -func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { +func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.Metrics, config *protocol2.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { opts := &redis.Options{ Addr: fmt.Sprintf("%s:%s", config.Host, config.Port), Username: config.Username, @@ -174,7 +174,7 @@ func (rc *RedisCollector) collectSingle(ctx context.Context, metrics *jobtypes.M rc.addValueRow(response, metrics.AliasFields, parseMap) } -func (rc *RedisCollector) collectCluster(ctx context.Context, metrics *jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { +func (rc *RedisCollector) collectCluster(ctx context.Context, metrics *jobtypes.Metrics, config *protocol2.RedisProtocol, dialer func(context.Context, string, string) (net.Conn, error), timeout time.Duration, response *jobtypes.CollectRepMetricsData, startTime time.Time) { opts := &redis.ClusterOptions{ Addrs: []string{fmt.Sprintf("%s:%s", config.Host, config.Port)}, Username: config.Username, @@ -301,13 +301,13 @@ func (rc *RedisCollector) parseInfo(info string) map[string]string { } // createRedisDialer creates a dialer function that supports SSH tunneling -func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol.SSHTunnel) (func(context.Context, string, string) (net.Conn, error), error) { +func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol2.SSHTunnel) (func(context.Context, string, string) (net.Conn, error), error) { if sshTunnel == nil || sshTunnel.Enable != "true" { return nil, nil } // Create SSH config - sshConfig := &protocol.SSHProtocol{ + sshConfig := &protocol2.SSHProtocol{ Host: sshTunnel.Host, Port: sshTunnel.Port, Username: sshTunnel.Username, diff --git a/internal/collector/basic/redis/redis_collector_test.go b/internal/protocol/basic/redis/redis_collector_test.go similarity index 97% rename from internal/collector/basic/redis/redis_collector_test.go rename to internal/protocol/basic/redis/redis_collector_test.go index 56f7b2c..d611a32 100644 --- a/internal/collector/basic/redis/redis_collector_test.go +++ b/internal/protocol/basic/redis/redis_collector_test.go @@ -26,10 +26,10 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/assert" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/basic/smtp/.keep b/internal/protocol/basic/s7/.keep similarity index 100% rename from internal/collector/basic/smtp/.keep rename to internal/protocol/basic/s7/.keep diff --git a/internal/collector/basic/snmp/.keep b/internal/protocol/basic/script/.keep similarity index 100% rename from internal/collector/basic/snmp/.keep rename to internal/protocol/basic/script/.keep diff --git a/internal/collector/basic/telnet/.keep b/internal/protocol/basic/sd/.keep similarity index 100% rename from internal/collector/basic/telnet/.keep rename to internal/protocol/basic/sd/.keep diff --git a/internal/collector/basic/udp/.keep b/internal/protocol/basic/smtp/.keep similarity index 100% rename from internal/collector/basic/udp/.keep rename to internal/protocol/basic/smtp/.keep diff --git a/internal/collector/basic/websocket/.keep b/internal/protocol/basic/snmp/.keep similarity index 100% rename from internal/collector/basic/websocket/.keep rename to internal/protocol/basic/snmp/.keep diff --git a/internal/collector/basic/ssh/ssh_collector.go b/internal/protocol/basic/ssh/ssh_collector.go similarity index 98% rename from internal/collector/basic/ssh/ssh_collector.go rename to internal/protocol/basic/ssh/ssh_collector.go index a611ded..fcf2cd5 100644 --- a/internal/collector/basic/ssh/ssh_collector.go +++ b/internal/protocol/basic/ssh/ssh_collector.go @@ -26,10 +26,10 @@ import ( "strings" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" sshhelper "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh" diff --git a/internal/collector/common/cache/.keep b/internal/protocol/basic/telnet/.keep similarity index 100% rename from internal/collector/common/cache/.keep rename to internal/protocol/basic/telnet/.keep diff --git a/internal/collector/common/dispatcher/entrance/.keep b/internal/protocol/basic/udp/.keep similarity index 100% rename from internal/collector/common/dispatcher/entrance/.keep rename to internal/protocol/basic/udp/.keep diff --git a/internal/collector/common/dispatcher/exporter/.keep b/internal/protocol/basic/websocket/.keep similarity index 100% rename from internal/collector/common/dispatcher/exporter/.keep rename to internal/protocol/basic/websocket/.keep diff --git a/internal/collector/extension/kafka/.keep b/internal/protocol/extension/kafka/.keep similarity index 100% rename from internal/collector/extension/kafka/.keep rename to internal/protocol/extension/kafka/.keep diff --git a/internal/collector/extension/milvus/milvus_collector.go b/internal/protocol/extension/milvus/milvus_collector.go similarity index 96% rename from internal/collector/extension/milvus/milvus_collector.go rename to internal/protocol/extension/milvus/milvus_collector.go index 6f69317..8935ff1 100644 --- a/internal/collector/extension/milvus/milvus_collector.go +++ b/internal/protocol/extension/milvus/milvus_collector.go @@ -27,9 +27,9 @@ import ( "time" "github.com/milvus-io/milvus-sdk-go/v2/client" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/extension/milvus/milvus_collector_test.go b/internal/protocol/extension/milvus/milvus_collector_test.go similarity index 51% rename from internal/collector/extension/milvus/milvus_collector_test.go rename to internal/protocol/extension/milvus/milvus_collector_test.go index 137d4c3..7aa5433 100644 --- a/internal/collector/extension/milvus/milvus_collector_test.go +++ b/internal/protocol/extension/milvus/milvus_collector_test.go @@ -20,32 +20,32 @@ package milvus import ( - "os" - "testing" + "os" + "testing" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" - loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) func TestMilvusCollector_Collect(t *testing.T) { - metrics := &job.Metrics{ - Name: "milvus_test", - Milvus: &protocol.MilvusProtocol{ - Host: "localhost", - Port: "19530", - }, - AliasFields: []string{"version", "responseTime", "host", "port"}, - } + metrics := &job.Metrics{ + Name: "milvus_test", + Milvus: &protocol.MilvusProtocol{ + Host: "localhost", + Port: "19530", + }, + AliasFields: []string{"version", "responseTime", "host", "port"}, + } - l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) - collector := NewMilvusCollector(l) - result := collector.Collect(metrics) + l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) + collector := NewMilvusCollector(l) + result := collector.Collect(metrics) - if result.Code != 0 { - t.Logf("Collect failed: %s", result.Msg) - } else { - t.Logf("Collect success: %+v", result.Values) - } + if result.Code != 0 { + t.Logf("Collect failed: %s", result.Msg) + } else { + t.Logf("Collect success: %+v", result.Values) + } } diff --git a/internal/collector/extension/mongodb/.keep b/internal/protocol/extension/mongodb/.keep similarity index 100% rename from internal/collector/extension/mongodb/.keep rename to internal/protocol/extension/mongodb/.keep diff --git a/internal/collector/extension/nebulagraph/.keep b/internal/protocol/extension/nebulagraph/.keep similarity index 100% rename from internal/collector/extension/nebulagraph/.keep rename to internal/protocol/extension/nebulagraph/.keep diff --git a/internal/collector/extension/rocketmq/.keep b/internal/protocol/extension/rocketmq/.keep similarity index 100% rename from internal/collector/extension/rocketmq/.keep rename to internal/protocol/extension/rocketmq/.keep diff --git a/internal/collector/basic/standard/imports.go b/internal/protocol/standard/imports.go similarity index 67% rename from internal/collector/basic/standard/imports.go rename to internal/protocol/standard/imports.go index 5eb3217..ae47d46 100644 --- a/internal/collector/basic/standard/imports.go +++ b/internal/protocol/standard/imports.go @@ -20,11 +20,11 @@ package standard import ( - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database" - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http" - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis" - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/database" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/http" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/redis" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/ssh" // extensions - _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/extension/milvus" + _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/extension/milvus" ) diff --git a/internal/collector/common/server/server.go b/internal/server/server.go similarity index 88% rename from internal/collector/common/server/server.go rename to internal/server/server.go index 0994f9e..f06a5fe 100644 --- a/internal/collector/common/server/server.go +++ b/internal/server/server.go @@ -22,8 +22,8 @@ package server import ( "io" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config" - logger2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + logger2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/collector/common/server/server_test.go b/internal/server/server_test.go similarity index 100% rename from internal/collector/common/server/server_test.go rename to internal/server/server_test.go diff --git a/internal/transport/processors.go b/internal/transport/processors.go index 408a0b5..d969f48 100644 --- a/internal/transport/processors.go +++ b/internal/transport/processors.go @@ -25,8 +25,8 @@ import ( "time" pb "hertzbeat.apache.org/hertzbeat-collector-go/api" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param" diff --git a/internal/transport/transport.go b/internal/transport/transport.go new file mode 100644 index 0000000..807630a --- /dev/null +++ b/internal/transport/transport.go @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package transport + +import ( + "context" + "encoding/json" + "fmt" + "os" + + pb "hertzbeat.apache.org/hertzbeat-collector-go/api" + config2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/config" + clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector" + configtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config" + loggertypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" +) + +const ( + // DefaultManagerAddr is the default manager server address (Java Netty default port) + DefaultManagerAddr = "127.0.0.1:1158" + // DefaultProtocol is the default communication protocol for Java compatibility + DefaultProtocol = "netty" + // DefaultMode is the default operation mode + DefaultMode = "public" + // DefaultIdentity is the default collector identity + DefaultIdentity = "collector-go" +) + +type Runner struct { + Config *configtypes.CollectorConfig + client TransportClient + jobScheduler JobScheduler + clrserver.Server +} + +func New(cfg *configtypes.CollectorConfig) *Runner { + return &Runner{ + Config: cfg, + Server: clrserver.Server{ + Logger: logger.Logger{}, // Will be initialized properly in Start method + }, + } +} + +// SetJobScheduler sets the job scheduler for the transport runner +func (r *Runner) SetJobScheduler(scheduler JobScheduler) { + r.jobScheduler = scheduler +} + +// NewFromConfig creates a new transport runner from collector configuration +func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner { + if cfg == nil { + return nil + } + return New(cfg) +} + +// NewFromEnv creates a new transport runner from environment variables +func NewFromEnv() *Runner { + envLoader := config2.NewEnvConfigLoader() + cfg := envLoader.LoadFromEnv() + return NewFromConfig(cfg) +} + +// NewFromUnifiedConfig creates a new transport runner using unified configuration loading +// It loads from file first, then overrides with environment variables +func NewFromUnifiedConfig(cfgPath string) (*Runner, error) { + unifiedLoader := config2.NewUnifiedConfigLoader(cfgPath) + cfg, err := unifiedLoader.Load() + if err != nil { + return nil, err + } + return NewFromConfig(cfg), nil +} + +func (r *Runner) Start(ctx context.Context) error { + // 初始化 Logger 如果它还没有被设置 + if r.Logger.IsZero() { + r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo) + } + r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", r.Info().Name) + r.Logger.Info("Starting transport client") + + // 构建 server 地址 + addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, r.Config.Collector.Manager.Port) + if addr == ":" { + // 如果配置为空,使用环境变量或默认值 + if v := os.Getenv("MANAGER_ADDR"); v != "" { + addr = v + } else { + addr = DefaultManagerAddr + } + } + + // 确定协议 + protocol := r.Config.Collector.Manager.Protocol + if protocol == "" { + if v := os.Getenv("MANAGER_PROTOCOL"); v != "" { + protocol = v + } else { + protocol = DefaultProtocol + } + } + + r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", protocol) + + // 创建客户端 + factory := &TransportClientFactory{} + client, err := factory.CreateClient(protocol, addr) + if err != nil { + r.Logger.Error(err, "Failed to create transport client") + return err + } + + // Set the identity on the client if it supports it + identity := r.Config.Collector.Identity + if identity == "" { + identity = DefaultIdentity + } + + if nettyClient, ok := client.(*NettyClient); ok { + nettyClient.SetIdentity(identity) + } + + r.client = client + + // 设置事件处理器 + switch c := client.(type) { + case *GrpcClient: + c.SetEventHandler(func(event Event) { + switch event.Type { + case EventConnected: + r.Logger.Info("Connected to manager gRPC server", "addr", event.Address) + go r.sendOnlineMessage() + case EventDisconnected: + r.Logger.Info("Disconnected from manager gRPC server", "addr", event.Address) + case EventConnectFailed: + r.Logger.Error(event.Error, "Failed to connect to manager gRPC server", "addr", event.Address) + } + }) + // Register processors with job scheduler + if r.jobScheduler != nil { + RegisterDefaultProcessors(c, r.jobScheduler) + r.Logger.Info("Registered gRPC processors with job scheduler") + } else { + RegisterDefaultProcessors(c, nil) + r.Logger.Info("Registered gRPC processors without job scheduler") + } + case *NettyClient: + c.SetEventHandler(func(event Event) { + switch event.Type { + case EventConnected: + r.Logger.Info("Connected to manager netty server", "addr", event.Address) + go r.sendOnlineMessage() + case EventDisconnected: + r.Logger.Info("Disconnected from manager netty server", "addr", event.Address) + case EventConnectFailed: + r.Logger.Error(event.Error, "Failed to connect to manager netty server", "addr", event.Address) + } + }) + // Register processors with job scheduler + if r.jobScheduler != nil { + RegisterDefaultNettyProcessors(c, r.jobScheduler) + r.Logger.Info("Registered netty processors with job scheduler") + } else { + RegisterDefaultNettyProcessors(c, nil) + r.Logger.Info("Registered netty processors without job scheduler") + } + } + + if err := r.client.Start(); err != nil { + r.Logger.Error(err, "Failed to start transport client") + return err + } + + // 创建新的context用于监控关闭信号 + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // 监听 ctx.Done 优雅关闭 + go func() { + <-ctx.Done() + r.Logger.Info("Shutting down transport client...") + _ = r.client.Shutdown() + }() + + // 阻塞直到 ctx.Done + <-ctx.Done() + return nil +} + +func (r *Runner) sendOnlineMessage() { + if r.client != nil && r.client.IsStarted() { + // Use the configured identity + identity := r.Config.Collector.Identity + if identity == "" { + identity = DefaultIdentity + } + + // Create CollectorInfo JSON structure as expected by Java server + mode := r.Config.Collector.Mode + if mode == "" { + mode = DefaultMode // Default mode as in Java version + } + + collectorInfo := map[string]interface{}{ + "name": identity, + "ip": "", // Let server detect IP + "version": "1.0.0", + "mode": mode, + } + + // Convert to JSON bytes + jsonData, err := json.Marshal(collectorInfo) + if err != nil { + r.Logger.Error(err, "Failed to marshal collector info to JSON") + return + } + + onlineMsg := &pb.Message{ + Type: pb.MessageType_GO_ONLINE, + Direction: pb.Direction_REQUEST, + Identity: identity, + Msg: jsonData, + } + + r.Logger.Info("Sending online message", "identity", identity, "type", onlineMsg.Type) + + if err := r.client.SendMsg(onlineMsg); err != nil { + r.Logger.Error(err, "Failed to send online message", "identity", identity) + } else { + r.Logger.Info("Online message sent successfully", "identity", identity) + } + } +} + +func (r *Runner) Info() collector.Info { + return collector.Info{ + Name: "transport", + } +} + +func (r *Runner) Close() error { + r.Logger.Info("transport close...") + if r.client != nil { + _ = r.client.Shutdown() + } + return nil +} + +// GetClient returns the transport client (for testing and advanced usage) +func (r *Runner) GetClient() TransportClient { + return r.client +} + +// IsConnected returns whether the client is connected and started +func (r *Runner) IsConnected() bool { + return r.client != nil && r.client.IsStarted() +} diff --git a/internal/collector/common/types/collector/collector_types.go b/internal/types/collector/collector_types.go similarity index 100% rename from internal/collector/common/types/collector/collector_types.go rename to internal/types/collector/collector_types.go diff --git a/internal/collector/common/types/config/config_types.go b/internal/types/config/config_types.go similarity index 100% rename from internal/collector/common/types/config/config_types.go rename to internal/types/config/config_types.go diff --git a/internal/collector/common/types/err/error_types.go b/internal/types/err/error_types.go similarity index 100% rename from internal/collector/common/types/err/error_types.go rename to internal/types/err/error_types.go diff --git a/internal/collector/common/types/job/job_types.go b/internal/types/job/job_types.go similarity index 100% rename from internal/collector/common/types/job/job_types.go rename to internal/types/job/job_types.go diff --git a/internal/collector/common/types/job/metrics_types.go b/internal/types/job/metrics_types.go similarity index 88% rename from internal/collector/common/types/job/metrics_types.go rename to internal/types/job/metrics_types.go index ea591f3..207bf07 100644 --- a/internal/collector/common/types/job/metrics_types.go +++ b/internal/types/job/metrics_types.go @@ -21,7 +21,7 @@ import ( "fmt" "time" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" + protocol2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" ) // Metrics represents a metric configuration @@ -46,14 +46,14 @@ type Metrics struct { HasSubTask bool `json:"hasSubTask"` // Protocol specific fields - HTTP *protocol.HTTPProtocol `json:"http,omitempty"` - SSH *protocol.SSHProtocol `json:"ssh,omitempty"` - JDBC *protocol.JDBCProtocol `json:"jdbc,omitempty"` - SNMP *protocol.SNMPProtocol `json:"snmp,omitempty"` - JMX *protocol.JMXProtocol `json:"jmx,omitempty"` - Redis *protocol.RedisProtocol `json:"redis,omitempty"` - MongoDB *protocol.MongoDBProtocol `json:"mongodb,omitempty"` - Milvus *protocol.MilvusProtocol `json:"milvus,omitempty"` + HTTP *protocol2.HTTPProtocol `json:"http,omitempty"` + SSH *protocol2.SSHProtocol `json:"ssh,omitempty"` + JDBC *protocol2.JDBCProtocol `json:"jdbc,omitempty"` + SNMP *protocol2.SNMPProtocol `json:"snmp,omitempty"` + JMX *protocol2.JMXProtocol `json:"jmx,omitempty"` + Redis *protocol2.RedisProtocol `json:"redis,omitempty"` + MongoDB *protocol2.MongoDBProtocol `json:"mongodb,omitempty"` + Milvus *protocol2.MilvusProtocol `json:"milvus,omitempty"` } // Field represents a metric field @@ -218,18 +218,18 @@ type SSHProtocol struct { // JDBCProtocol represents JDBC protocol configuration type JDBCProtocol struct { - Host string `json:"host"` - Port string `json:"port"` - Platform string `json:"platform"` - Username string `json:"username"` - Password string `json:"password"` - Database string `json:"database"` - Timeout string `json:"timeout"` - QueryType string `json:"queryType"` - SQL string `json:"sql"` - URL string `json:"url"` - ReuseConnection string `json:"reuseConnection"` - SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"` + Host string `json:"host"` + Port string `json:"port"` + Platform string `json:"platform"` + Username string `json:"username"` + Password string `json:"password"` + Database string `json:"database"` + Timeout string `json:"timeout"` + QueryType string `json:"queryType"` + SQL string `json:"sql"` + URL string `json:"url"` + ReuseConnection string `json:"reuseConnection"` + SSHTunnel *protocol2.SSHTunnel `json:"sshTunnel,omitempty"` } // SNMPProtocol represents SNMP protocol configuration @@ -262,13 +262,13 @@ type JMXProtocol struct { // RedisProtocol represents Redis protocol configuration type RedisProtocol struct { - Host string `json:"host"` - Port string `json:"port"` - Username string `json:"username"` - Password string `json:"password"` - Pattern string `json:"pattern"` - Timeout string `json:"timeout"` - SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"` + Host string `json:"host"` + Port string `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Pattern string `json:"pattern"` + Timeout string `json:"timeout"` + SSHTunnel *protocol2.SSHTunnel `json:"sshTunnel,omitempty"` } // MongoDBProtocol represents MongoDB protocol configuration diff --git a/internal/collector/common/types/job/protocol/common_request_protocol.go b/internal/types/job/protocol/common_request_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/common_request_protocol.go rename to internal/types/job/protocol/common_request_protocol.go diff --git a/internal/collector/common/types/job/protocol/consul_sd_protocol.go b/internal/types/job/protocol/consul_sd_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/consul_sd_protocol.go rename to internal/types/job/protocol/consul_sd_protocol.go diff --git a/internal/collector/common/types/job/protocol/http_protocol.go b/internal/types/job/protocol/http_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/http_protocol.go rename to internal/types/job/protocol/http_protocol.go diff --git a/internal/collector/common/types/job/protocol/jdbc_protocol.go b/internal/types/job/protocol/jdbc_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/jdbc_protocol.go rename to internal/types/job/protocol/jdbc_protocol.go diff --git a/internal/collector/common/types/job/protocol/jmx_protocol.go b/internal/types/job/protocol/jmx_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/jmx_protocol.go rename to internal/types/job/protocol/jmx_protocol.go diff --git a/internal/collector/common/types/job/protocol/milvus_protocol.go b/internal/types/job/protocol/milvus_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/milvus_protocol.go rename to internal/types/job/protocol/milvus_protocol.go diff --git a/internal/collector/common/types/job/protocol/mongodb_protocol.go b/internal/types/job/protocol/mongodb_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/mongodb_protocol.go rename to internal/types/job/protocol/mongodb_protocol.go diff --git a/internal/collector/common/types/job/protocol/redis_protocol.go b/internal/types/job/protocol/redis_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/redis_protocol.go rename to internal/types/job/protocol/redis_protocol.go diff --git a/internal/collector/common/types/job/protocol/snmp_protocol.go b/internal/types/job/protocol/snmp_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/snmp_protocol.go rename to internal/types/job/protocol/snmp_protocol.go diff --git a/internal/collector/common/types/job/protocol/ssh_protocol.go b/internal/types/job/protocol/ssh_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/ssh_protocol.go rename to internal/types/job/protocol/ssh_protocol.go diff --git a/internal/collector/common/types/job/protocol/ssh_tunnel.go b/internal/types/job/protocol/ssh_tunnel.go similarity index 100% rename from internal/collector/common/types/job/protocol/ssh_tunnel.go rename to internal/types/job/protocol/ssh_tunnel.go diff --git a/internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go b/internal/types/job/protocol/zookeeper_sd_protocol.go similarity index 100% rename from internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go rename to internal/types/job/protocol/zookeeper_sd_protocol.go diff --git a/internal/collector/common/types/job/timeout_types.go b/internal/types/job/timeout_types.go similarity index 100% rename from internal/collector/common/types/job/timeout_types.go rename to internal/types/job/timeout_types.go diff --git a/internal/collector/common/types/logger/logging_types.go b/internal/types/logger/logging_types.go similarity index 100% rename from internal/collector/common/types/logger/logging_types.go rename to internal/types/logger/logging_types.go diff --git a/internal/util/arrow/arrow_serializer.go b/internal/util/arrow/arrow_serializer.go index 1915b43..0b78f5a 100644 --- a/internal/util/arrow/arrow_serializer.go +++ b/internal/util/arrow/arrow_serializer.go @@ -24,7 +24,7 @@ import ( "encoding/json" "fmt" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" + jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/util/crypto/aes_util.go b/internal/util/crypto/aes_util.go index 678760f..1567c56 100644 --- a/internal/util/crypto/aes_util.go +++ b/internal/util/crypto/aes_util.go @@ -26,7 +26,7 @@ import ( "sync" "unicode" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/util/logger/logger.go b/internal/util/logger/logger.go index 67d4652..cc657eb 100644 --- a/internal/util/logger/logger.go +++ b/internal/util/logger/logger.go @@ -18,65 +18,64 @@ package logger import ( - "io" - "os" - - "github.com/go-logr/logr" - "github.com/go-logr/zapr" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "io" + "os" + + "github.com/go-logr/logr" + "github.com/go-logr/zapr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" ) type Logger struct { - logr.Logger - out io.Writer - logging *logger.HertzBeatLogging - sugaredLogger *zap.SugaredLogger + logr.Logger + out io.Writer + logging *logger.HertzBeatLogging + sugaredLogger *zap.SugaredLogger } func NewLogger(w io.Writer, logging *logger.HertzBeatLogging) Logger { - logger := initZapLogger(w, logging, logging.Level[logger.LogComponentHertzbeatDefault]) + logger := initZapLogger(w, logging, logging.Level[logger.LogComponentHertzbeatDefault]) - return Logger{ - Logger: zapr.NewLogger(logger), - out: w, - logging: logging, - sugaredLogger: logger.Sugar(), - } + return Logger{ + Logger: zapr.NewLogger(logger), + out: w, + logging: logging, + sugaredLogger: logger.Sugar(), + } } func FileLogger(file, name string, level logger.LogLevel) Logger { - writer, err := os.OpenFile(file, os.O_WRONLY, 0o666) - if err != nil { - panic(err) - } + writer, err := os.OpenFile(file, os.O_WRONLY, 0o666) + if err != nil { + panic(err) + } - logging := logger.DefaultHertzbeatLogging() - logger := initZapLogger(writer, logging, level) + logging := logger.DefaultHertzbeatLogging() + logger := initZapLogger(writer, logging, level) - return Logger{ - Logger: zapr.NewLogger(logger).WithName(name), - logging: logging, - out: writer, - sugaredLogger: logger.Sugar(), - } + return Logger{ + Logger: zapr.NewLogger(logger).WithName(name), + logging: logging, + out: writer, + sugaredLogger: logger.Sugar(), + } } func DefaultLogger(out io.Writer, level logger.LogLevel) Logger { - logging := logger.DefaultHertzbeatLogging() - logger := initZapLogger(out, logging, level) + logging := logger.DefaultHertzbeatLogging() + logger := initZapLogger(out, logging, level) - return Logger{ - Logger: zapr.NewLogger(logger), - out: out, - logging: logging, - sugaredLogger: logger.Sugar(), - } + return Logger{ + Logger: zapr.NewLogger(logger), + out: out, + logging: logging, + sugaredLogger: logger.Sugar(), + } } // WithName returns a new Logger instance with the specified name element added @@ -86,23 +85,23 @@ func DefaultLogger(out io.Writer, level logger.LogLevel) Logger { // more information). func (l Logger) WithName(name string) Logger { - logLevel := l.logging.Level[logger.HertzbeatLogComponent(name)] - logger := initZapLogger(l.out, l.logging, logLevel) + logLevel := l.logging.Level[logger.HertzbeatLogComponent(name)] + logger := initZapLogger(l.out, l.logging, logLevel) - return Logger{ - Logger: zapr.NewLogger(logger).WithName(name), - logging: l.logging, - out: l.out, - sugaredLogger: logger.Sugar().Named(name), - } + return Logger{ + Logger: zapr.NewLogger(logger).WithName(name), + logging: l.logging, + out: l.out, + sugaredLogger: logger.Sugar().Named(name), + } } // WithValues returns a new Logger instance with additional key/value pairs. // See Info for documentation on how key/value pairs work. func (l Logger) WithValues(keysAndValues ...interface{}) Logger { - l.Logger = l.Logger.WithValues(keysAndValues...) - return l + l.Logger = l.Logger.WithValues(keysAndValues...) + return l } // A Sugar wraps the base Logger functionality in a slower, but less @@ -125,13 +124,13 @@ func (l Logger) WithValues(keysAndValues ...interface{}) Logger { // Infoln(...any) Println-style logger func (l Logger) Sugar() *zap.SugaredLogger { - return l.sugaredLogger + return l.sugaredLogger } func initZapLogger(w io.Writer, logging *logger.HertzBeatLogging, level logger.LogLevel) *zap.Logger { - parseLevel, _ := zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level))) - core := zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel)) + parseLevel, _ := zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level))) + core := zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel)) - return zap.New(core, zap.AddCaller()) + return zap.New(core, zap.AddCaller()) } diff --git a/internal/util/logger/logger_test.go b/internal/util/logger/logger_test.go index c3dd2cf..b3c7844 100644 --- a/internal/util/logger/logger_test.go +++ b/internal/util/logger/logger_test.go @@ -26,7 +26,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" ) func TestZapLogLevel(t *testing.T) { diff --git a/internal/util/param/param_replacer.go b/internal/util/param/param_replacer.go index 984e8cd..4ee423f 100644 --- a/internal/util/param/param_replacer.go +++ b/internal/util/param/param_replacer.go @@ -26,9 +26,9 @@ import ( "strconv" "strings" - jobtypes "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job" + protocol2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) @@ -44,7 +44,7 @@ func NewReplacer() *Replacer { // PreprocessJobPasswords decrypts passwords in job configmap once during job creation // This permanently replaces encrypted passwords with decrypted ones in the job configmap -func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error { +func (r *Replacer) PreprocessJobPasswords(job *job.Job) error { if job == nil || job.Configmap == nil { return nil } @@ -71,7 +71,7 @@ func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error { } // ReplaceJobParams replaces all parameter placeholders in job configuration -func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) { +func (r *Replacer) ReplaceJobParams(job *job.Job) (*job.Job, error) { if job == nil { return nil, fmt.Errorf("job is nil") } @@ -97,7 +97,7 @@ func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) { // createParamMapSimple creates a parameter map from configmap entries // Assumes passwords have already been decrypted by PreprocessJobPasswords -func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap) map[string]string { +func (r *Replacer) createParamMapSimple(configmap []job.Configmap) map[string]string { paramMap := make(map[string]string) for _, config := range configmap { @@ -129,7 +129,7 @@ func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap) map[stri // ReplaceMetricsParams replaces parameters in metrics configuration // Exported method to be called by MetricsCollector -func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap map[string]string) error { +func (r *Replacer) ReplaceMetricsParams(metrics *job.Metrics, paramMap map[string]string) error { // 1. JDBC if metrics.JDBC != nil { r.replaceJDBCParams(metrics.JDBC, paramMap) @@ -159,7 +159,7 @@ func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap map[ } // replaceHTTPParams specific replacement logic for HTTPProtocol struct -func (r *Replacer) replaceHTTPParams(http *protocol.HTTPProtocol, paramMap map[string]string) { +func (r *Replacer) replaceHTTPParams(http *protocol2.HTTPProtocol, paramMap map[string]string) { http.URL = r.replaceParamPlaceholders(http.URL, paramMap) http.Method = r.replaceParamPlaceholders(http.Method, paramMap) http.Body = r.replaceParamPlaceholders(http.Body, paramMap) @@ -207,7 +207,7 @@ func (r *Replacer) replaceHTTPParams(http *protocol.HTTPProtocol, paramMap map[s } // replaceRedisParams specific replacement logic for RedisProtocol struct -func (r *Replacer) replaceRedisParams(redis *protocol.RedisProtocol, paramMap map[string]string) { +func (r *Replacer) replaceRedisParams(redis *protocol2.RedisProtocol, paramMap map[string]string) { redis.Host = r.replaceParamPlaceholders(redis.Host, paramMap) redis.Port = r.replaceParamPlaceholders(redis.Port, paramMap) redis.Username = r.replaceParamPlaceholders(redis.Username, paramMap) @@ -223,7 +223,7 @@ func (r *Replacer) replaceRedisParams(redis *protocol.RedisProtocol, paramMap ma } // replaceSSHParams specific replacement logic for SSHProtocol struct -func (r *Replacer) replaceSSHParams(ssh *protocol.SSHProtocol, paramMap map[string]string) { +func (r *Replacer) replaceSSHParams(ssh *protocol2.SSHProtocol, paramMap map[string]string) { ssh.Host = r.replaceParamPlaceholders(ssh.Host, paramMap) ssh.Port = r.replaceParamPlaceholders(ssh.Port, paramMap) ssh.Username = r.replaceParamPlaceholders(ssh.Username, paramMap) @@ -244,7 +244,7 @@ func (r *Replacer) replaceSSHParams(ssh *protocol.SSHProtocol, paramMap map[stri } // replaceJDBCParams specific replacement logic for JDBCProtocol struct -func (r *Replacer) replaceJDBCParams(jdbc *protocol.JDBCProtocol, paramMap map[string]string) { +func (r *Replacer) replaceJDBCParams(jdbc *protocol2.JDBCProtocol, paramMap map[string]string) { jdbc.Host = r.replaceParamPlaceholders(jdbc.Host, paramMap) jdbc.Port = r.replaceParamPlaceholders(jdbc.Port, paramMap) jdbc.Platform = r.replaceParamPlaceholders(jdbc.Platform, paramMap) @@ -265,7 +265,7 @@ func (r *Replacer) replaceJDBCParams(jdbc *protocol.JDBCProtocol, paramMap map[s } // replaceBasicMetricsParams replaces parameters in basic metrics fields -func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics, paramMap map[string]string) error { +func (r *Replacer) replaceBasicMetricsParams(metrics *job.Metrics, paramMap map[string]string) error { metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap) metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap) metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap) @@ -321,14 +321,14 @@ func (r *Replacer) ExtractProtocolConfig(protocolInterface interface{}, targetSt } // ExtractJDBCConfig extracts and processes JDBC configuration -func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCProtocol, error) { +func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) (*protocol2.JDBCProtocol, error) { if jdbcInterface == nil { return nil, nil } - if jdbcConfig, ok := jdbcInterface.(*protocol.JDBCProtocol); ok { + if jdbcConfig, ok := jdbcInterface.(*protocol2.JDBCProtocol); ok { return jdbcConfig, nil } - var jdbcConfig protocol.JDBCProtocol + var jdbcConfig protocol2.JDBCProtocol if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err != nil { return nil, fmt.Errorf("failed to extract JDBC config: %w", err) } @@ -336,14 +336,14 @@ func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCP } // ExtractHTTPConfig extracts and processes HTTP configuration -func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) (*protocol.HTTPProtocol, error) { +func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) (*protocol2.HTTPProtocol, error) { if httpInterface == nil { return nil, nil } - if httpConfig, ok := httpInterface.(*protocol.HTTPProtocol); ok { + if httpConfig, ok := httpInterface.(*protocol2.HTTPProtocol); ok { return httpConfig, nil } - var httpConfig protocol.HTTPProtocol + var httpConfig protocol2.HTTPProtocol if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err != nil { return nil, fmt.Errorf("failed to extract HTTP config: %w", err) } @@ -351,14 +351,14 @@ func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) (*protocol.HTTPP } // ExtractSSHConfig extracts and processes SSH configuration -func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) (*protocol.SSHProtocol, error) { +func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) (*protocol2.SSHProtocol, error) { if sshInterface == nil { return nil, nil } - if sshConfig, ok := sshInterface.(*protocol.SSHProtocol); ok { + if sshConfig, ok := sshInterface.(*protocol2.SSHProtocol); ok { return sshConfig, nil } - var sshConfig protocol.SSHProtocol + var sshConfig protocol2.SSHProtocol if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil { return nil, fmt.Errorf("failed to extract SSH config: %w", err) } diff --git a/internal/util/ssh/ssh_helper.go b/internal/util/ssh/ssh_helper.go index 77b7d21..03bc7ce 100644 --- a/internal/util/ssh/ssh_helper.go +++ b/internal/util/ssh/ssh_helper.go @@ -28,8 +28,8 @@ import ( "strings" "golang.org/x/crypto/ssh" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) diff --git a/internal/util/ssh/ssh_helper_test.go b/internal/util/ssh/ssh_helper_test.go index 4cbc6fd..9bda9d8 100644 --- a/internal/util/ssh/ssh_helper_test.go +++ b/internal/util/ssh/ssh_helper_test.go @@ -31,9 +31,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/crypto/ssh" + "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol" + loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger" - "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol" - loggertype "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger" "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger" ) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
