This is an automated email from the ASF dual-hosted git repository.

shown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 51f252a  refactor: adjust layout (#35)
51f252a is described below

commit 51f252aadf158891826a03c13bcdad3e22744fee
Author: shown <[email protected]>
AuthorDate: Sun Dec 14 14:13:56 2025 +0800

    refactor: adjust layout (#35)
---
 cmd/main.go                                        |  16 +-
 internal/banner/embed.go                           |   4 +-
 internal/cmd/server.go                             |  16 +-
 internal/collector/basic/http/http_collector.go    | 596 ---------------------
 internal/collector/common/transport/transport.go   | 277 ----------
 internal/{collector => }/config/config.go          |   6 +-
 internal/{collector => }/config/config_factory.go  |   4 +-
 internal/{collector => }/config/config_test.go     |   0
 internal/{collector => }/config/env_config.go      |   4 +-
 internal/{collector => }/config/unified_config.go  |   4 +-
 internal/{collector/basic/dns => job/cache}/.keep  |   0
 .../collect/dispatch/metrics_collector.go          |   6 +-
 .../collect/dispatch/metrics_collector_test.go     |   4 +-
 .../common => job}/collect/lazy_message_router.go  |   2 +-
 .../collect/metrics/hertzbeat_metrics_collector.go |   0
 .../common => job}/collect/result_handler.go       |   6 +-
 .../collect/strategy/strategy_factory.go           |   2 +-
 .../common => job}/dispatcher/common_dispatcher.go |   4 +-
 .../basic/ftp => job/dispatcher/entrance}/.keep    |   0
 .../basic/icmp => job/dispatcher/exporter}/.keep   |   0
 .../dispatcher/hashed_wheel_timer.go               |   2 +-
 .../common => job}/dispatcher/time_wheel.go        |  22 +-
 .../common => job}/dispatcher/wheel_timer_task.go  |  14 +-
 .../common => job}/router/message_router.go        |  18 +-
 .../common/job => job/server}/job_server.go        |  16 +-
 .../collector_types.go => metrics/metrics.go}      |   6 +-
 .../basic/database/jdbc_collector.go               |  14 +-
 .../basic/database/jdbc_collector_test.go          |   0
 .../basic/imap => protocol/basic/dns}/.keep        |   0
 .../basic/ipmi2 => protocol/basic/ftp}/.keep       |   0
 internal/protocol/basic/http/http_collector.go     | 596 +++++++++++++++++++++
 .../basic/memcached => protocol/basic/icmp}/.keep  |   0
 .../basic/modbus => protocol/basic/imap}/.keep     |   0
 .../basic/mqtt => protocol/basic/ipmi2}/.keep      |   0
 .../basic/nginx => protocol/basic/memcached}/.keep |   0
 .../basic/ntp => protocol/basic/modbus}/.keep      |   0
 .../basic/plc => protocol/basic/mqtt}/.keep        |   0
 .../basic/pop3 => protocol/basic/nginx}/.keep      |   0
 .../basic/prometheus => protocol/basic/ntp}/.keep  |   0
 .../basic/push => protocol/basic/plc}/.keep        |   0
 .../basic/redfish => protocol/basic/pop3}/.keep    |   0
 .../basic/s7 => protocol/basic/prometheus}/.keep   |   0
 .../basic/script => protocol/basic/push}/.keep     |   0
 .../basic/sd => protocol/basic/redfish}/.keep      |   0
 .../basic/redis/redis_collector.go                 |  14 +-
 .../basic/redis/redis_collector_test.go            |   6 +-
 .../basic/smtp => protocol/basic/s7}/.keep         |   0
 .../basic/snmp => protocol/basic/script}/.keep     |   0
 .../basic/telnet => protocol/basic/sd}/.keep       |   0
 .../basic/udp => protocol/basic/smtp}/.keep        |   0
 .../basic/websocket => protocol/basic/snmp}/.keep  |   0
 .../basic/ssh/ssh_collector.go                     |   6 +-
 .../common/cache => protocol/basic/telnet}/.keep   |   0
 .../entrance => protocol/basic/udp}/.keep          |   0
 .../exporter => protocol/basic/websocket}/.keep    |   0
 .../{collector => protocol}/extension/kafka/.keep  |   0
 .../extension/milvus/milvus_collector.go           |   4 +-
 .../extension/milvus/milvus_collector_test.go      |  44 +-
 .../extension/mongodb/.keep                        |   0
 .../extension/nebulagraph/.keep                    |   0
 .../extension/rocketmq/.keep                       |   0
 .../basic => protocol}/standard/imports.go         |  10 +-
 internal/{collector/common => }/server/server.go   |   4 +-
 .../{collector/common => }/server/server_test.go   |   0
 internal/transport/processors.go                   |   4 +-
 internal/transport/transport.go                    | 276 ++++++++++
 .../common => }/types/collector/collector_types.go |   0
 .../common => }/types/config/config_types.go       |   0
 .../common => }/types/err/error_types.go           |   0
 .../{collector/common => }/types/job/job_types.go  |   0
 .../common => }/types/job/metrics_types.go         |  56 +-
 .../types/job/protocol/common_request_protocol.go  |   0
 .../types/job/protocol/consul_sd_protocol.go       |   0
 .../types/job/protocol/http_protocol.go            |   0
 .../types/job/protocol/jdbc_protocol.go            |   0
 .../common => }/types/job/protocol/jmx_protocol.go |   0
 .../types/job/protocol/milvus_protocol.go          |   0
 .../types/job/protocol/mongodb_protocol.go         |   0
 .../types/job/protocol/redis_protocol.go           |   0
 .../types/job/protocol/snmp_protocol.go            |   0
 .../common => }/types/job/protocol/ssh_protocol.go |   0
 .../common => }/types/job/protocol/ssh_tunnel.go   |   0
 .../types/job/protocol/zookeeper_sd_protocol.go    |   0
 .../common => }/types/job/timeout_types.go         |   0
 .../common => }/types/logger/logging_types.go      |   0
 internal/util/arrow/arrow_serializer.go            |   2 +-
 internal/util/crypto/aes_util.go                   |   2 +-
 internal/util/logger/logger.go                     | 107 ++--
 internal/util/logger/logger_test.go                |   2 +-
 internal/util/param/param_replacer.go              |  42 +-
 internal/util/ssh/ssh_helper.go                    |   2 +-
 internal/util/ssh/ssh_helper_test.go               |   4 +-
 92 files changed, 1109 insertions(+), 1115 deletions(-)

diff --git a/cmd/main.go b/cmd/main.go
index 2b497b4..8acd589 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -18,17 +18,17 @@
 package main
 
 import (
-       "fmt"
-       "os"
+  "fmt"
+  "os"
 
-       "hertzbeat.apache.org/hertzbeat-collector-go/cmd/root"
-       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/standard"
+  "hertzbeat.apache.org/hertzbeat-collector-go/cmd/root"
+  _ "hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/standard"
 )
 
 func main() {
 
-       if err := root.GetRootCommand().Execute(); err != nil {
-               _, _ = fmt.Fprintln(os.Stderr, err)
-               os.Exit(1)
-       }
+  if err := root.GetRootCommand().Execute(); err != nil {
+    _, _ = fmt.Fprintln(os.Stderr, err)
+    os.Exit(1)
+  }
 }
diff --git a/internal/banner/embed.go b/internal/banner/embed.go
index e18bdd1..f3c3aa2 100644
--- a/internal/banner/embed.go
+++ b/internal/banner/embed.go
@@ -24,8 +24,8 @@ import (
        "strconv"
        "text/template"
 
-       clrserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-       bannertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err"
+       clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+       bannertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
 )
 
 const (
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 687f52a..5980263 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -28,16 +28,16 @@ import (
        "syscall"
 
        "github.com/spf13/cobra"
+       cfgloader "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
+       jobserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server"
+       clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+       transportserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+       collectortypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+       configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+       collectorerr 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
 
        bannerouter 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/banner"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
-       jobserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
-       clrserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-       transportserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
-       collectortypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
-       configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-       collectorerr 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err"
-       cfgloader 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
 )
 
 var (
diff --git a/internal/collector/basic/http/http_collector.go 
b/internal/collector/basic/http/http_collector.go
deleted file mode 100644
index 32632a5..0000000
--- a/internal/collector/basic/http/http_collector.go
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package http
-
-import (
-  "bytes"
-  "crypto/md5"
-  "crypto/rand"
-  "crypto/tls"
-  "encoding/hex"
-  "encoding/json"
-  "fmt"
-  "io"
-  "net/http"
-  "strconv"
-  "strings"
-  "time"
-
-  "github.com/prometheus/common/expfmt"
-  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
-  "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
-  "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-func init() {
-  strategy.RegisterFactory(ProtocolHTTP, func(logger logger.Logger) 
strategy.Collector {
-    return NewHTTPCollector(logger)
-  })
-}
-
-const (
-  ProtocolHTTP = "http"
-
-  // Parse Types
-  ParseTypeDefault    = "default"
-  ParseTypeJsonPath   = "jsonPath"
-  ParseTypePrometheus = "prometheus"
-  ParseTypeWebsite    = "website"
-  ParseTypeHeader     = "header"
-
-  // Auth Types
-  AuthTypeBasic  = "Basic Auth"
-  AuthTypeBearer = "Bearer Token"
-  AuthTypeDigest = "Digest Auth"
-)
-
-type HTTPCollector struct {
-  logger logger.Logger
-}
-
-func NewHTTPCollector(log logger.Logger) *HTTPCollector {
-  return &HTTPCollector{
-    logger: log.WithName("http-collector"),
-  }
-}
-
-func (hc *HTTPCollector) Protocol() string {
-  return ProtocolHTTP
-}
-
-func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error {
-  if metrics == nil || metrics.HTTP == nil {
-    return fmt.Errorf("http configuration is missing")
-  }
-  return nil
-}
-
-func (hc *HTTPCollector) Collect(metrics *job.Metrics) 
*job.CollectRepMetricsData {
-  start := time.Now()
-
-  // MetricsCollector has already performed parameter replacement on 
metrics.HTTP
-  httpConfig := metrics.HTTP
-
-  // 1. Prepare URL
-  targetURL := httpConfig.URL
-
-  // Parse SSL bool string
-  isSSL := false
-  if httpConfig.SSL != "" {
-    if val, err := strconv.ParseBool(httpConfig.SSL); err == nil {
-      isSSL = val
-    }
-  }
-
-  if targetURL == "" {
-    schema := "http"
-    if isSSL {
-      schema = "https"
-    }
-    // Use metrics.ConfigMap values if URL is empty
-    // Note: metrics.ConfigMap is already populated with job.Configmap values
-    host := metrics.ConfigMap["host"]
-    port := metrics.ConfigMap["port"]
-    if host != "" && port != "" {
-      targetURL = fmt.Sprintf("%s://%s:%s", schema, host, port)
-    }
-  }
-
-  if targetURL != "" && !strings.HasPrefix(targetURL, "http") {
-    schema := "http"
-    if isSSL {
-      schema = "https"
-    }
-    targetURL = fmt.Sprintf("%s://%s", schema, targetURL)
-  }
-
-  if targetURL == "" {
-    return hc.createFailResponse(metrics, constants.CollectFail, "target URL 
is empty")
-  }
-
-  // 2. Create Request
-  req, err := hc.createRequest(httpConfig, targetURL)
-  if err != nil {
-    return hc.createFailResponse(metrics, constants.CollectFail, 
fmt.Sprintf("failed to create request: %v", err))
-  }
-
-  // 3. Create Client
-  timeoutStr := metrics.Timeout
-  if httpConfig.Timeout != "" {
-    timeoutStr = httpConfig.Timeout
-  }
-
-  timeout := hc.getTimeout(timeoutStr)
-  client := &http.Client{
-    Timeout: timeout,
-    Transport: &http.Transport{
-      TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
-    },
-  }
-
-  // 4. Execute Request (with Digest retry logic)
-  resp, err := client.Do(req)
-
-  // Handle Digest Auth Challenge (401)
-  if err == nil && resp.StatusCode == http.StatusUnauthorized &&
-    httpConfig.Authorization != nil &&
-    strings.EqualFold(httpConfig.Authorization.Type, AuthTypeDigest) {
-
-    // Close the first response body
-    io.Copy(io.Discard, resp.Body)
-    resp.Body.Close()
-
-    authHeader := resp.Header.Get("WWW-Authenticate")
-    if authHeader != "" {
-      // Create new request with Authorization header
-      digestReq, digestErr := hc.handleDigestAuth(req, authHeader, 
httpConfig.Authorization)
-      if digestErr == nil {
-        // Retry request
-        resp, err = client.Do(digestReq)
-      } else {
-        hc.logger.Error(digestErr, "failed to handle digest auth")
-      }
-    }
-  }
-
-  if err != nil {
-    return hc.createFailResponse(metrics, constants.CollectUnReachable, 
fmt.Sprintf("request failed: %v", err))
-  }
-  defer resp.Body.Close()
-
-  responseTime := time.Since(start).Milliseconds()
-
-  // 5. Parse Response
-  parseType := httpConfig.ParseType
-  if parseType == "" {
-    parseType = ParseTypeDefault
-  }
-
-  var responseData *job.CollectRepMetricsData
-
-  // Read body
-  bodyBytes, err := io.ReadAll(resp.Body)
-  if err != nil {
-    return hc.createFailResponse(metrics, constants.CollectFail, 
fmt.Sprintf("failed to read body: %v", err))
-  }
-
-  switch parseType {
-  case ParseTypePrometheus:
-    responseData, err = hc.parsePrometheus(bodyBytes, metrics)
-  case ParseTypeWebsite:
-    responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode, 
responseTime, metrics, httpConfig)
-  case ParseTypeHeader:
-    responseData, err = hc.parseHeader(resp.Header, metrics)
-  case ParseTypeJsonPath, ParseTypeDefault:
-    responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, 
httpConfig)
-  default:
-    responseData, err = hc.parseJsonPath(bodyBytes, metrics, responseTime, 
httpConfig)
-  }
-
-  if err != nil {
-    hc.logger.Error(err, "parse response failed", "type", parseType)
-    return hc.createFailResponse(metrics, constants.CollectFail, 
fmt.Sprintf("parse error: %v", err))
-  }
-
-  hc.fillCommonFields(responseData, metrics.AliasFields, responseTime, 
resp.StatusCode)
-
-  return responseData
-}
-
-// handleDigestAuth generates a new request with the Digest Authorization 
header
-func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, 
authHeader string, authConfig *protocol.Authorization) (*http.Request, error) {
-  params := hc.parseAuthHeader(authHeader)
-  realm := params["realm"]
-  nonce := params["nonce"]
-  qop := params["qop"]
-  opaque := params["opaque"]
-  algorithm := params["algorithm"]
-  if algorithm == "" {
-    algorithm = "MD5"
-  }
-
-  if realm == "" || nonce == "" {
-    return nil, fmt.Errorf("missing realm or nonce in digest header")
-  }
-
-  username := authConfig.DigestAuthUsername
-  password := authConfig.DigestAuthPassword
-  ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password))
-
-  method := originalReq.Method
-  uri := originalReq.URL.RequestURI()
-  ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri))
-
-  nc := "00000001"
-  cnonce := hc.generateCnonce()
-
-  var responseStr string
-  if qop == "" {
-    responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce, ha2))
-  } else if strings.Contains(qop, "auth") {
-    responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1, nonce, nc, 
cnonce, "auth", ha2))
-  } else {
-    return nil, fmt.Errorf("unsupported qop: %s", qop)
-  }
-
-  authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", 
uri="%s", response="%s", algorithm="%s"`,
-    username, realm, nonce, uri, responseStr, algorithm)
-
-  if opaque != "" {
-    authVal += fmt.Sprintf(`, opaque="%s"`, opaque)
-  }
-  if qop != "" {
-    authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc, cnonce)
-  }
-
-  var newReq *http.Request
-  if originalReq.GetBody != nil {
-    body, _ := originalReq.GetBody()
-    newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), 
body)
-  } else {
-    newReq, _ = http.NewRequest(originalReq.Method, originalReq.URL.String(), 
nil)
-  }
-
-  for k, v := range originalReq.Header {
-    newReq.Header[k] = v
-  }
-
-  newReq.Header.Set("Authorization", authVal)
-  return newReq, nil
-}
-
-func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string {
-  header = strings.TrimPrefix(header, "Digest ")
-  result := make(map[string]string)
-
-  pairs := strings.Split(header, ",")
-  for _, pair := range pairs {
-    parts := strings.SplitN(strings.TrimSpace(pair), "=", 2)
-    if len(parts) == 2 {
-      key := strings.TrimSpace(parts[0])
-      val := strings.TrimSpace(parts[1])
-      val = strings.Trim(val, "\"")
-      result[key] = val
-    }
-  }
-  return result
-}
-
-func (hc *HTTPCollector) md5Hex(s string) string {
-  hash := md5.Sum([]byte(s))
-  return hex.EncodeToString(hash[:])
-}
-
-func (hc *HTTPCollector) generateCnonce() string {
-  b := make([]byte, 8)
-  io.ReadFull(rand.Reader, b)
-  return hex.EncodeToString(b)
-}
-
-func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol, 
targetURL string) (*http.Request, error) {
-  method := strings.ToUpper(config.Method)
-  if method == "" {
-    method = "GET"
-  }
-
-  var body io.Reader
-  if config.Body != "" {
-    body = strings.NewReader(config.Body)
-  }
-
-  req, err := http.NewRequest(method, targetURL, body)
-  if err != nil {
-    return nil, err
-  }
-
-  for k, v := range config.Headers {
-    req.Header.Set(k, v)
-  }
-
-  if config.Authorization != nil {
-    auth := config.Authorization
-    if auth.Type == "" || strings.EqualFold(auth.Type, AuthTypeBasic) {
-      if auth.BasicAuthUsername != "" && auth.BasicAuthPassword != "" {
-        req.SetBasicAuth(auth.BasicAuthUsername, auth.BasicAuthPassword)
-      }
-    } else if strings.EqualFold(auth.Type, AuthTypeBearer) {
-      if auth.BearerTokenToken != "" {
-        req.Header.Set("Authorization", "Bearer "+auth.BearerTokenToken)
-      }
-    }
-  }
-
-  if len(config.Params) > 0 {
-    q := req.URL.Query()
-    for k, v := range config.Params {
-      q.Add(k, v)
-    }
-    req.URL.RawQuery = q.Encode()
-  }
-
-  return req, nil
-}
-
-// --- Parsers ---
-
-func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics) 
(*job.CollectRepMetricsData, error) {
-  response := hc.createSuccessResponse(metrics)
-  var parser expfmt.TextParser
-  metricFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(body))
-  if err != nil {
-    return nil, err
-  }
-  now := time.Now().UnixMilli()
-  for _, mf := range metricFamilies {
-    for _, m := range mf.Metric {
-      row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
-      labels := make(map[string]string)
-      for _, pair := range m.Label {
-        labels[pair.GetName()] = pair.GetValue()
-      }
-      hasValue := false
-      for i, field := range metrics.AliasFields {
-        if field == constants.NULL_VALUE {
-          continue
-        }
-        if field == "value" || field == "prom_value" {
-          if m.Gauge != nil {
-            row.Columns[i] = fmt.Sprintf("%f", m.Gauge.GetValue())
-          } else if m.Counter != nil {
-            row.Columns[i] = fmt.Sprintf("%f", m.Counter.GetValue())
-          } else if m.Untyped != nil {
-            row.Columns[i] = fmt.Sprintf("%f", m.Untyped.GetValue())
-          }
-          hasValue = true
-        } else {
-          if val, ok := labels[field]; ok {
-            row.Columns[i] = val
-            hasValue = true
-          } else {
-            row.Columns[i] = constants.NULL_VALUE
-          }
-        }
-      }
-      if hasValue {
-        response.Values = append(response.Values, row)
-      }
-    }
-  }
-  response.Time = now
-  return response, nil
-}
-
-func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, 
responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol) 
(*job.CollectRepMetricsData, error) {
-  response := hc.createSuccessResponse(metrics)
-  row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
-  keyword := httpConfig.Keyword
-  keywordCount := 0
-  if keyword != "" {
-    keywordCount = strings.Count(string(body), keyword)
-  }
-  for i, field := range metrics.AliasFields {
-    switch strings.ToLower(field) {
-    case strings.ToLower(constants.StatusCode):
-      row.Columns[i] = strconv.Itoa(statusCode)
-    case strings.ToLower(constants.RESPONSE_TIME):
-      row.Columns[i] = strconv.FormatInt(responseTime, 10)
-    case "keyword":
-      row.Columns[i] = strconv.Itoa(keywordCount)
-    default:
-      row.Columns[i] = constants.NULL_VALUE
-    }
-  }
-  response.Values = append(response.Values, row)
-  return response, nil
-}
-
-func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics) 
(*job.CollectRepMetricsData, error) {
-  response := hc.createSuccessResponse(metrics)
-  row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
-  for i, field := range metrics.AliasFields {
-    val := header.Get(field)
-    if val != "" {
-      row.Columns[i] = val
-    } else {
-      row.Columns[i] = constants.NULL_VALUE
-    }
-  }
-  response.Values = append(response.Values, row)
-  return response, nil
-}
-
-func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, 
responseTime int64, httpConfig *protocol.HTTPProtocol) 
(*job.CollectRepMetricsData, error) {
-  response := hc.createSuccessResponse(metrics)
-  var data interface{}
-  decoder := json.NewDecoder(bytes.NewReader(body))
-  decoder.UseNumber()
-  if err := decoder.Decode(&data); err != nil {
-    return response, nil
-  }
-  parseScript := httpConfig.ParseScript
-  root := data
-  if parseScript != "" && parseScript != "$" {
-    root = hc.navigateJson(data, parseScript)
-  }
-  if root == nil {
-    return response, nil
-  }
-  if arr, ok := root.([]interface{}); ok {
-    for _, item := range arr {
-      row := hc.extractRow(item, metrics.AliasFields)
-      response.Values = append(response.Values, row)
-    }
-  } else {
-    row := hc.extractRow(root, metrics.AliasFields)
-    response.Values = append(response.Values, row)
-  }
-  return response, nil
-}
-
-func (hc *HTTPCollector) navigateJson(data interface{}, path string) 
interface{} {
-  path = strings.TrimPrefix(path, "$.")
-  if path == "" {
-    return data
-  }
-  parts := strings.Split(path, ".")
-  current := data
-  for _, part := range parts {
-    if current == nil {
-      return nil
-    }
-    key := part
-    idx := -1
-    if i := strings.Index(part, "["); i > -1 && strings.HasSuffix(part, "]") {
-      key = part[:i]
-      idxStr := part[i+1 : len(part)-1]
-      if val, err := strconv.Atoi(idxStr); err == nil {
-        idx = val
-      }
-    }
-    if key != "" {
-      if m, ok := current.(map[string]interface{}); ok {
-        if val, exists := m[key]; exists {
-          current = val
-        } else {
-          return nil
-        }
-      } else {
-        return nil
-      }
-    }
-    if idx > -1 {
-      if arr, ok := current.([]interface{}); ok {
-        if idx < len(arr) {
-          current = arr[idx]
-        } else {
-          return nil
-        }
-      } else {
-        return nil
-      }
-    }
-  }
-  return current
-}
-
-func (hc *HTTPCollector) extractRow(item interface{}, fields []string) 
job.ValueRow {
-  row := job.ValueRow{Columns: make([]string, len(fields))}
-  m, isMap := item.(map[string]interface{})
-  for i, field := range fields {
-    if strings.EqualFold(field, constants.RESPONSE_TIME) || 
strings.EqualFold(field, constants.StatusCode) {
-      row.Columns[i] = constants.NULL_VALUE
-      continue
-    }
-    var val interface{}
-    if isMap {
-      if v, ok := m[field]; ok {
-        val = v
-      } else {
-        val = hc.navigateJson(item, field)
-      }
-    }
-    if val != nil {
-      row.Columns[i] = fmt.Sprintf("%v", val)
-    } else {
-      row.Columns[i] = constants.NULL_VALUE
-    }
-  }
-  return row
-}
-
-func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData, 
fields []string, responseTime int64, statusCode int) {
-  if resp == nil {
-    return
-  }
-  if len(resp.Fields) == 0 {
-    resp.Fields = make([]job.Field, len(fields))
-    for i, f := range fields {
-      resp.Fields[i] = job.Field{Field: f, Type: constants.TYPE_STRING}
-    }
-  }
-  if len(resp.Values) == 0 {
-    row := job.ValueRow{Columns: make([]string, len(fields))}
-    for k := range row.Columns {
-      row.Columns[k] = constants.NULL_VALUE
-    }
-    resp.Values = append(resp.Values, row)
-  }
-  for i := range resp.Values {
-    for j, field := range fields {
-      if strings.EqualFold(field, constants.RESPONSE_TIME) {
-        resp.Values[i].Columns[j] = strconv.FormatInt(responseTime, 10)
-      }
-      if strings.EqualFold(field, constants.StatusCode) {
-        resp.Values[i].Columns[j] = strconv.Itoa(statusCode)
-      }
-    }
-  }
-}
-
-func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration {
-  if timeoutStr == "" {
-    return 10 * time.Second
-  }
-  if val, err := strconv.Atoi(timeoutStr); err == nil {
-    return time.Duration(val) * time.Millisecond
-  }
-  return 10 * time.Second
-}
-
-func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics) 
*job.CollectRepMetricsData {
-  return &job.CollectRepMetricsData{
-    Metrics: metrics.Name,
-    Time:    time.Now().UnixMilli(),
-    Code:    constants.CollectSuccess,
-    Msg:     "success",
-    Values:  make([]job.ValueRow, 0),
-  }
-}
-
-func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int, 
msg string) *job.CollectRepMetricsData {
-  return &job.CollectRepMetricsData{
-    Metrics: metrics.Name,
-    Time:    time.Now().UnixMilli(),
-    Code:    code,
-    Msg:     msg,
-  }
-}
diff --git a/internal/collector/common/transport/transport.go 
b/internal/collector/common/transport/transport.go
deleted file mode 100644
index f2edf73..0000000
--- a/internal/collector/common/transport/transport.go
+++ /dev/null
@@ -1,277 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package transport
-
-import (
-  "context"
-  "encoding/json"
-  "fmt"
-  "os"
-
-  pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
-  clrserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
-  configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-  loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
-  "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
-  "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
-  "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
-)
-
-const (
-  // DefaultManagerAddr is the default manager server address (Java Netty 
default port)
-  DefaultManagerAddr = "127.0.0.1:1158"
-  // DefaultProtocol is the default communication protocol for Java 
compatibility
-  DefaultProtocol = "netty"
-  // DefaultMode is the default operation mode
-  DefaultMode = "public"
-  // DefaultIdentity is the default collector identity
-  DefaultIdentity = "collector-go"
-)
-
-type Runner struct {
-  Config       *configtypes.CollectorConfig
-  client       transport.TransportClient
-  jobScheduler transport.JobScheduler
-  clrserver.Server
-}
-
-func New(cfg *configtypes.CollectorConfig) *Runner {
-  return &Runner{
-    Config: cfg,
-    Server: clrserver.Server{
-      Logger: logger.Logger{}, // Will be initialized properly in Start method
-    },
-  }
-}
-
-// SetJobScheduler sets the job scheduler for the transport runner
-func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) {
-  r.jobScheduler = scheduler
-}
-
-// NewFromConfig creates a new transport runner from collector configuration
-func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
-  if cfg == nil {
-    return nil
-  }
-  return New(cfg)
-}
-
-// NewFromEnv creates a new transport runner from environment variables
-func NewFromEnv() *Runner {
-  envLoader := config.NewEnvConfigLoader()
-  cfg := envLoader.LoadFromEnv()
-  return NewFromConfig(cfg)
-}
-
-// NewFromUnifiedConfig creates a new transport runner using unified 
configuration loading
-// It loads from file first, then overrides with environment variables
-func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
-  unifiedLoader := config.NewUnifiedConfigLoader(cfgPath)
-  cfg, err := unifiedLoader.Load()
-  if err != nil {
-    return nil, err
-  }
-  return NewFromConfig(cfg), nil
-}
-
-func (r *Runner) Start(ctx context.Context) error {
-  // 初始化 Logger 如果它还没有被设置
-  if r.Logger.IsZero() {
-    r.Logger = logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
-  }
-  r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
-  r.Logger.Info("Starting transport client")
-
-  // 构建 server 地址
-  addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, 
r.Config.Collector.Manager.Port)
-  if addr == ":" {
-    // 如果配置为空,使用环境变量或默认值
-    if v := os.Getenv("MANAGER_ADDR"); v != "" {
-      addr = v
-    } else {
-      addr = DefaultManagerAddr
-    }
-  }
-
-  // 确定协议
-  protocol := r.Config.Collector.Manager.Protocol
-  if protocol == "" {
-    if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
-      protocol = v
-    } else {
-      protocol = DefaultProtocol
-    }
-  }
-
-  r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", 
protocol)
-
-  // 创建客户端
-  factory := &transport.TransportClientFactory{}
-  client, err := factory.CreateClient(protocol, addr)
-  if err != nil {
-    r.Logger.Error(err, "Failed to create transport client")
-    return err
-  }
-
-  // Set the identity on the client if it supports it
-  identity := r.Config.Collector.Identity
-  if identity == "" {
-    identity = DefaultIdentity
-  }
-
-  if nettyClient, ok := client.(*transport.NettyClient); ok {
-    nettyClient.SetIdentity(identity)
-  }
-
-  r.client = client
-
-  // 设置事件处理器
-  switch c := client.(type) {
-  case *transport.GrpcClient:
-    c.SetEventHandler(func(event transport.Event) {
-      switch event.Type {
-      case transport.EventConnected:
-        r.Logger.Info("Connected to manager gRPC server", "addr", 
event.Address)
-        go r.sendOnlineMessage()
-      case transport.EventDisconnected:
-        r.Logger.Info("Disconnected from manager gRPC server", "addr", 
event.Address)
-      case transport.EventConnectFailed:
-        r.Logger.Error(event.Error, "Failed to connect to manager gRPC 
server", "addr", event.Address)
-      }
-    })
-    // Register processors with job scheduler
-    if r.jobScheduler != nil {
-      transport.RegisterDefaultProcessors(c, r.jobScheduler)
-      r.Logger.Info("Registered gRPC processors with job scheduler")
-    } else {
-      transport.RegisterDefaultProcessors(c, nil)
-      r.Logger.Info("Registered gRPC processors without job scheduler")
-    }
-  case *transport.NettyClient:
-    c.SetEventHandler(func(event transport.Event) {
-      switch event.Type {
-      case transport.EventConnected:
-        r.Logger.Info("Connected to manager netty server", "addr", 
event.Address)
-        go r.sendOnlineMessage()
-      case transport.EventDisconnected:
-        r.Logger.Info("Disconnected from manager netty server", "addr", 
event.Address)
-      case transport.EventConnectFailed:
-        r.Logger.Error(event.Error, "Failed to connect to manager netty 
server", "addr", event.Address)
-      }
-    })
-    // Register processors with job scheduler
-    if r.jobScheduler != nil {
-      transport.RegisterDefaultNettyProcessors(c, r.jobScheduler)
-      r.Logger.Info("Registered netty processors with job scheduler")
-    } else {
-      transport.RegisterDefaultNettyProcessors(c, nil)
-      r.Logger.Info("Registered netty processors without job scheduler")
-    }
-  }
-
-  if err := r.client.Start(); err != nil {
-    r.Logger.Error(err, "Failed to start transport client")
-    return err
-  }
-
-  // 创建新的context用于监控关闭信号
-  ctx, cancel := context.WithCancel(ctx)
-  defer cancel()
-
-  // 监听 ctx.Done 优雅关闭
-  go func() {
-    <-ctx.Done()
-    r.Logger.Info("Shutting down transport client...")
-    _ = r.client.Shutdown()
-  }()
-
-  // 阻塞直到 ctx.Done
-  <-ctx.Done()
-  return nil
-}
-
-func (r *Runner) sendOnlineMessage() {
-  if r.client != nil && r.client.IsStarted() {
-    // Use the configured identity
-    identity := r.Config.Collector.Identity
-    if identity == "" {
-      identity = DefaultIdentity
-    }
-
-    // Create CollectorInfo JSON structure as expected by Java server
-    mode := r.Config.Collector.Mode
-    if mode == "" {
-      mode = DefaultMode // Default mode as in Java version
-    }
-
-    collectorInfo := map[string]interface{}{
-      "name":    identity,
-      "ip":      "", // Let server detect IP
-      "version": "1.0.0",
-      "mode":    mode,
-    }
-
-    // Convert to JSON bytes
-    jsonData, err := json.Marshal(collectorInfo)
-    if err != nil {
-      r.Logger.Error(err, "Failed to marshal collector info to JSON")
-      return
-    }
-
-    onlineMsg := &pb.Message{
-      Type:      pb.MessageType_GO_ONLINE,
-      Direction: pb.Direction_REQUEST,
-      Identity:  identity,
-      Msg:       jsonData,
-    }
-
-    r.Logger.Info("Sending online message", "identity", identity, "type", 
onlineMsg.Type)
-
-    if err := r.client.SendMsg(onlineMsg); err != nil {
-      r.Logger.Error(err, "Failed to send online message", "identity", 
identity)
-    } else {
-      r.Logger.Info("Online message sent successfully", "identity", identity)
-    }
-  }
-}
-
-func (r *Runner) Info() collector.Info {
-  return collector.Info{
-    Name: "transport",
-  }
-}
-
-func (r *Runner) Close() error {
-  r.Logger.Info("transport close...")
-  if r.client != nil {
-    _ = r.client.Shutdown()
-  }
-  return nil
-}
-
-// GetClient returns the transport client (for testing and advanced usage)
-func (r *Runner) GetClient() transport.TransportClient {
-  return r.client
-}
-
-// IsConnected returns whether the client is connected and started
-func (r *Runner) IsConnected() bool {
-  return r.client != nil && r.client.IsStarted()
-}
diff --git a/internal/collector/config/config.go b/internal/config/config.go
similarity index 95%
rename from internal/collector/config/config.go
rename to internal/config/config.go
index a5bafdb..a5a299f 100644
--- a/internal/collector/config/config.go
+++ b/internal/config/config.go
@@ -22,10 +22,10 @@ import (
        "os"
 
        "gopkg.in/yaml.v3"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+       collectortypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/err"
+       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-       collectortypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/err"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/config/config_factory.go 
b/internal/config/config_factory.go
similarity index 98%
rename from internal/collector/config/config_factory.go
rename to internal/config/config_factory.go
index 47b82b5..7854d4f 100644
--- a/internal/collector/config/config_factory.go
+++ b/internal/config/config_factory.go
@@ -25,8 +25,8 @@ import (
        "strconv"
        "strings"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/config/config_test.go 
b/internal/config/config_test.go
similarity index 100%
rename from internal/collector/config/config_test.go
rename to internal/config/config_test.go
diff --git a/internal/collector/config/env_config.go 
b/internal/config/env_config.go
similarity index 96%
rename from internal/collector/config/env_config.go
rename to internal/config/env_config.go
index a81acf3..40c09c1 100644
--- a/internal/collector/config/env_config.go
+++ b/internal/config/env_config.go
@@ -20,8 +20,8 @@ package config
 import (
        "os"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/config/unified_config.go 
b/internal/config/unified_config.go
similarity index 97%
rename from internal/collector/config/unified_config.go
rename to internal/config/unified_config.go
index bd0fde5..2640448 100644
--- a/internal/collector/config/unified_config.go
+++ b/internal/config/unified_config.go
@@ -20,8 +20,8 @@ package config
 import (
        "os"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/basic/dns/.keep b/internal/job/cache/.keep
similarity index 100%
rename from internal/collector/basic/dns/.keep
rename to internal/job/cache/.keep
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/job/collect/dispatch/metrics_collector.go
similarity index 98%
rename from internal/collector/common/collect/dispatch/metrics_collector.go
rename to internal/job/collect/dispatch/metrics_collector.go
index 7261ebe..4074275 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/job/collect/dispatch/metrics_collector.go
@@ -29,9 +29,9 @@ import (
        "github.com/expr-lang/expr"
        "github.com/expr-lang/expr/vm"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/unit"
@@ -503,7 +503,7 @@ func formatNumber(value float64) string {
 
 // CollectMetrics collects metrics using the appropriate collector and returns 
results via channel
 // This method uses Go's channel-based concurrency instead of Java's thread 
pools
-func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job 
*jobtypes.Job, timeout *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData {
+func (mc *MetricsCollector) CollectMetrics(metrics *jobtypes.Metrics, job 
*jobtypes.Job, _ *jobtypes.Timeout) chan *jobtypes.CollectRepMetricsData {
        resultChan := make(chan *jobtypes.CollectRepMetricsData, 1)
 
        // Start collection in a goroutine (Go's lightweight thread)
diff --git 
a/internal/collector/common/collect/dispatch/metrics_collector_test.go 
b/internal/job/collect/dispatch/metrics_collector_test.go
similarity index 99%
rename from internal/collector/common/collect/dispatch/metrics_collector_test.go
rename to internal/job/collect/dispatch/metrics_collector_test.go
index bae124d..581de71 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector_test.go
+++ b/internal/job/collect/dispatch/metrics_collector_test.go
@@ -22,9 +22,9 @@ package dispatch
 import (
        "testing"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/common/collect/lazy_message_router.go 
b/internal/job/collect/lazy_message_router.go
similarity index 99%
rename from internal/collector/common/collect/lazy_message_router.go
rename to internal/job/collect/lazy_message_router.go
index a038099..9613fb0 100644
--- a/internal/collector/common/collect/lazy_message_router.go
+++ b/internal/job/collect/lazy_message_router.go
@@ -30,8 +30,8 @@ import (
        "github.com/apache/arrow/go/v13/arrow/memory"
 
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git 
a/internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go 
b/internal/job/collect/metrics/hertzbeat_metrics_collector.go
similarity index 100%
rename from 
internal/collector/common/collect/metrics/hertzbeat_metrics_collector.go
rename to internal/job/collect/metrics/hertzbeat_metrics_collector.go
diff --git a/internal/collector/common/collect/result_handler.go 
b/internal/job/collect/result_handler.go
similarity index 90%
rename from internal/collector/common/collect/result_handler.go
rename to internal/job/collect/result_handler.go
index cd16820..1592e6c 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/job/collect/result_handler.go
@@ -22,8 +22,8 @@ package collect
 import (
        "fmt"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
@@ -35,7 +35,7 @@ type ResultHandlerImpl struct {
 
 // ResultHandler interface for handling collection results
 type ResultHandler interface {
-       HandleCollectData(data *jobtypes.CollectRepMetricsData, job 
*jobtypes.Job) error
+       HandleCollectData(data *job.CollectRepMetricsData, job *job.Job) error
 }
 
 // NewResultHandler creates a new result handler
@@ -47,7 +47,7 @@ func NewResultHandler(logger logger.Logger, messageRouter 
MessageRouter) ResultH
 }
 
 // HandleCollectData processes the collection results
-func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsData, job *jobtypes.Job) error {
+func (rh *ResultHandlerImpl) HandleCollectData(data 
*job.CollectRepMetricsData, job *job.Job) error {
        if data == nil {
                rh.logger.Error(nil, "collect data is nil")
                return fmt.Errorf("collect data is nil")
diff --git a/internal/collector/common/collect/strategy/strategy_factory.go 
b/internal/job/collect/strategy/strategy_factory.go
similarity index 99%
rename from internal/collector/common/collect/strategy/strategy_factory.go
rename to internal/job/collect/strategy/strategy_factory.go
index 9fc9af1..a118aa4 100644
--- a/internal/collector/common/collect/strategy/strategy_factory.go
+++ b/internal/job/collect/strategy/strategy_factory.go
@@ -21,7 +21,7 @@ import (
        "fmt"
        "sync"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go 
b/internal/job/dispatcher/common_dispatcher.go
similarity index 98%
rename from internal/collector/common/dispatcher/common_dispatcher.go
rename to internal/job/dispatcher/common_dispatcher.go
index 288e0c1..eb6afcc 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/job/dispatcher/common_dispatcher.go
@@ -25,8 +25,8 @@ import (
        "sync"
        "time"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
diff --git a/internal/collector/basic/ftp/.keep 
b/internal/job/dispatcher/entrance/.keep
similarity index 100%
rename from internal/collector/basic/ftp/.keep
rename to internal/job/dispatcher/entrance/.keep
diff --git a/internal/collector/basic/icmp/.keep 
b/internal/job/dispatcher/exporter/.keep
similarity index 100%
rename from internal/collector/basic/icmp/.keep
rename to internal/job/dispatcher/exporter/.keep
diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go 
b/internal/job/dispatcher/hashed_wheel_timer.go
similarity index 99%
rename from internal/collector/common/dispatcher/hashed_wheel_timer.go
rename to internal/job/dispatcher/hashed_wheel_timer.go
index 9100205..c65047d 100644
--- a/internal/collector/common/dispatcher/hashed_wheel_timer.go
+++ b/internal/job/dispatcher/hashed_wheel_timer.go
@@ -25,7 +25,7 @@ import (
        "sync/atomic"
        "time"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/common/dispatcher/time_wheel.go 
b/internal/job/dispatcher/time_wheel.go
similarity index 95%
rename from internal/collector/common/dispatcher/time_wheel.go
rename to internal/job/dispatcher/time_wheel.go
index d8c7aa1..a93df77 100644
--- a/internal/collector/common/dispatcher/time_wheel.go
+++ b/internal/job/dispatcher/time_wheel.go
@@ -25,14 +25,14 @@ import (
        "sync"
        "time"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
 // JobInfo holds information about a scheduled job
 type JobInfo struct {
-       Job             *jobtypes.Job
-       Timeout         *jobtypes.Timeout
+       Job             *job.Job
+       Timeout         *job.Timeout
        CreatedAt       time.Time
        LastExecutedAt  time.Time
        NextExecutionAt time.Time
@@ -70,12 +70,12 @@ type DispatcherStats struct {
 
 // MetricsTaskDispatcher interface for metrics task dispatching
 type MetricsTaskDispatcher interface {
-       DispatchMetricsTask(ctx context.Context, job *jobtypes.Job, timeout 
*jobtypes.Timeout) error
+       DispatchMetricsTask(ctx context.Context, job *job.Job, timeout 
*job.Timeout) error
 }
 
 // HashedWheelTimer interface for time wheel operations
 type HashedWheelTimer interface {
-       NewTimeout(task jobtypes.TimerTask, delay time.Duration) 
*jobtypes.Timeout
+       NewTimeout(task job.TimerTask, delay time.Duration) *job.Timeout
        Start(ctx context.Context) error
        Stop() error
        GetStats() map[string]interface{}
@@ -101,7 +101,7 @@ func NewTimeDispatch(logger logger.Logger, commonDispatcher 
MetricsTaskDispatche
 
 // AddJob adds a job to the time-based scheduler
 // This corresponds to Java's TimerDispatcher.addJob method
-func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
+func (td *TimeDispatch) AddJob(job *job.Job) error {
        if job == nil {
                return fmt.Errorf("job cannot be nil")
        }
@@ -172,7 +172,7 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error {
 
        // Try to remove from cyclic tasks
        if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); 
exists {
-               if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+               if timeout, ok := timeoutInterface.(*job.Timeout); ok {
                        timeout.Cancel()
                        td.jobInfos.Delete(jobID)
                        td.logger.V(1).Info("removed cyclic job", "jobID", 
jobID)
@@ -182,7 +182,7 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error {
 
        // Try to remove from temp tasks
        if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); 
exists {
-               if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
+               if timeout, ok := timeoutInterface.(*job.Timeout); ok {
                        timeout.Cancel()
                        td.jobInfos.Delete(jobID)
                        td.logger.V(1).Info("removed one-time job", "jobID", 
jobID)
@@ -233,14 +233,14 @@ func (td *TimeDispatch) Stop() error {
 
        // Cancel all running tasks
        td.cyclicTasks.Range(func(key, value interface{}) bool {
-               if timeout, ok := value.(*jobtypes.Timeout); ok {
+               if timeout, ok := value.(*job.Timeout); ok {
                        timeout.Cancel()
                }
                return true
        })
 
        td.tempTasks.Range(func(key, value interface{}) bool {
-               if timeout, ok := value.(*jobtypes.Timeout); ok {
+               if timeout, ok := value.(*job.Timeout); ok {
                        timeout.Cancel()
                }
                return true
@@ -339,7 +339,7 @@ func (td *TimeDispatch) UpdateJobExecution(jobID int64) {
 }
 
 // RescheduleJob reschedules a cyclic job for the next execution
-func (td *TimeDispatch) RescheduleJob(job *jobtypes.Job) error {
+func (td *TimeDispatch) RescheduleJob(job *job.Job) error {
        if !job.IsCyclic || job.DefaultInterval <= 0 {
                return fmt.Errorf("job is not cyclable or has invalid interval")
        }
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go 
b/internal/job/dispatcher/wheel_timer_task.go
similarity index 87%
rename from internal/collector/common/dispatcher/wheel_timer_task.go
rename to internal/job/dispatcher/wheel_timer_task.go
index 657103a..09bfb18 100644
--- a/internal/collector/common/dispatcher/wheel_timer_task.go
+++ b/internal/job/dispatcher/wheel_timer_task.go
@@ -24,7 +24,7 @@ import (
        "fmt"
        "time"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
@@ -34,20 +34,20 @@ type StatsRecorder interface {
        RecordJobCompleted()
        RecordJobFailed()
        UpdateJobExecution(jobID int64)
-       RescheduleJob(job *jobtypes.Job) error
+       RescheduleJob(job *job.Job) error
 }
 
 // WheelTimerTask represents a task that runs in the time wheel
 // This corresponds to Java's WheelTimerTask
 type WheelTimerTask struct {
-       job              *jobtypes.Job
+       job              *job.Job
        commonDispatcher MetricsTaskDispatcher
        statsRecorder    StatsRecorder
        logger           logger.Logger
 }
 
 // NewWheelTimerTask creates a new wheel timer task
-func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher 
MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger) 
*WheelTimerTask {
+func NewWheelTimerTask(job *job.Job, commonDispatcher MetricsTaskDispatcher, 
statsRecorder StatsRecorder, logger logger.Logger) *WheelTimerTask {
        return &WheelTimerTask{
                job:              job,
                commonDispatcher: commonDispatcher,
@@ -58,7 +58,7 @@ func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher 
MetricsTaskDispatcher
 
 // Run executes the timer task
 // This corresponds to Java's WheelTimerTask.run method
-func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) error {
+func (wtt *WheelTimerTask) Run(timeout *job.Timeout) error {
        if wtt.job == nil {
                wtt.logger.Error(nil, "job is nil, cannot execute")
                return fmt.Errorf("job is nil, cannot execute")
@@ -116,7 +116,7 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
 }
 
 // rescheduleJob reschedules a cyclic job for the next execution
-func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) {
+func (wtt *WheelTimerTask) rescheduleJob(timeout *job.Timeout) {
        if wtt.job.DefaultInterval <= 0 {
                wtt.logger.Info("job has no valid interval, not rescheduling",
                        "jobID", wtt.job.ID)
@@ -134,6 +134,6 @@ func (wtt *WheelTimerTask) rescheduleJob(timeout 
*jobtypes.Timeout) {
 }
 
 // GetJob returns the associated job
-func (wtt *WheelTimerTask) GetJob() *jobtypes.Job {
+func (wtt *WheelTimerTask) GetJob() *job.Job {
        return wtt.job
 }
diff --git a/internal/collector/common/router/message_router.go 
b/internal/job/router/message_router.go
similarity index 93%
rename from internal/collector/common/router/message_router.go
rename to internal/job/router/message_router.go
index e3093ee..e03d9c2 100644
--- a/internal/collector/common/router/message_router.go
+++ b/internal/job/router/message_router.go
@@ -24,9 +24,9 @@ import (
        "fmt"
 
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/server"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+       job2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
@@ -50,7 +50,7 @@ type MessageRouter interface {
        RegisterProcessors()
 
        // SendResult sends collection results back to the manager
-       SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) 
error
+       SendResult(data *job2.CollectRepMetricsData, job *job2.Job) error
 
        // GetIdentity returns the collector identity
        GetIdentity() string
@@ -60,7 +60,7 @@ type MessageRouter interface {
 type MessageRouterImpl struct {
        logger    logger.Logger
        client    transport.TransportClient
-       jobRunner *job.Runner
+       jobRunner *server.Runner
        identity  string
 }
 
@@ -68,7 +68,7 @@ type MessageRouterImpl struct {
 type Config struct {
        Logger    logger.Logger
        Client    transport.TransportClient
-       JobRunner *job.Runner
+       JobRunner *server.Runner
        Identity  string
 }
 
@@ -122,7 +122,7 @@ func (r *MessageRouterImpl) handleIssueCyclicTask(msg 
*pb.Message) (*pb.Message,
        r.logger.Info("Handling cyclic task message")
 
        // Parse job from message
-       var job jobtypes.Job
+       var job job2.Job
        if err := json.Unmarshal(msg.Msg, &job); err != nil {
                r.logger.Error(err, "failed to unmarshal job")
                return &pb.Message{
@@ -189,7 +189,7 @@ func (r *MessageRouterImpl) handleIssueOneTimeTask(msg 
*pb.Message) (*pb.Message
        r.logger.Info("Handling one-time task message")
 
        // Parse job from message
-       var job jobtypes.Job
+       var job job2.Job
        if err := json.Unmarshal(msg.Msg, &job); err != nil {
                r.logger.Error(err, "failed to unmarshal job")
                return &pb.Message{
@@ -223,7 +223,7 @@ func (r *MessageRouterImpl) handleIssueOneTimeTask(msg 
*pb.Message) (*pb.Message
 }
 
 // SendResult sends collection results back to the manager
-func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData, 
job *jobtypes.Job) error {
+func (r *MessageRouterImpl) SendResult(data *job2.CollectRepMetricsData, job 
*job2.Job) error {
        if r.client == nil || !r.client.IsStarted() {
                return fmt.Errorf("transport client not started")
        }
@@ -237,7 +237,7 @@ func (r *MessageRouterImpl) SendResult(data 
*jobtypes.CollectRepMetricsData, job
        }
 
        // Serialize metrics data
-       dataBytes, err := json.Marshal([]jobtypes.CollectRepMetricsData{*data})
+       dataBytes, err := json.Marshal([]job2.CollectRepMetricsData{*data})
        if err != nil {
                return fmt.Errorf("failed to marshal metrics data: %w", err)
        }
diff --git a/internal/collector/common/job/job_server.go 
b/internal/job/server/job_server.go
similarity index 92%
rename from internal/collector/common/job/job_server.go
rename to internal/job/server/job_server.go
index 4451ffa..0a47a0e 100644
--- a/internal/collector/common/job/job_server.go
+++ b/internal/job/server/job_server.go
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package job
+package server
 
 import (
        "context"
@@ -25,13 +25,13 @@ import (
        "fmt"
        "sync"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/dispatch"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/dispatcher"
-       clrserver 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/server"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/collector"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/dispatch"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/job/dispatcher"
+       clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
 )
 
 // TimeDispatcher interface defines time-based job scheduling
diff --git a/internal/collector/common/types/collector/collector_types.go 
b/internal/metrics/metrics.go
similarity index 91%
copy from internal/collector/common/types/collector/collector_types.go
copy to internal/metrics/metrics.go
index 4ab91e4..5e29267 100644
--- a/internal/collector/common/types/collector/collector_types.go
+++ b/internal/metrics/metrics.go
@@ -17,8 +17,4 @@
  * under the License.
  */
 
-package collector
-
-type Info struct {
-       Name string `json:"name" yaml:"name"`
-}
+package metrics
diff --git a/internal/collector/basic/database/jdbc_collector.go 
b/internal/protocol/basic/database/jdbc_collector.go
similarity index 98%
rename from internal/collector/basic/database/jdbc_collector.go
rename to internal/protocol/basic/database/jdbc_collector.go
index ec8f179..a84c6ac 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/protocol/basic/database/jdbc_collector.go
@@ -38,10 +38,10 @@ import (
        _ "github.com/microsoft/go-mssqldb"
        _ "github.com/sijms/go-ora/v2"
        "golang.org/x/crypto/ssh"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       protocol2 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
@@ -192,7 +192,7 @@ type sshTunnelHelper struct {
 // startSSHTunnel starts an SSH tunnel
 // It connects to the SSH bastion, listens on a local random port,
 // and forwards traffic to the target database.
-func startSSHTunnel(config *protocol.SSHTunnel, remoteHost, remotePort string, 
timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, string, 
error) {
+func startSSHTunnel(config *protocol2.SSHTunnel, remoteHost, remotePort 
string, timeout time.Duration, log logger.Logger) (*sshTunnelHelper, string, 
string, error) {
        sshHost := config.Host
        sshPort, err := strconv.Atoi(config.Port)
        if err != nil {
@@ -322,7 +322,7 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
 }
 
 // extractJDBCConfig extracts JDBC configuration from interface{} type
-func extractJDBCConfig(jdbcInterface interface{}) (*protocol.JDBCProtocol, 
error) {
+func extractJDBCConfig(jdbcInterface interface{}) (*protocol2.JDBCProtocol, 
error) {
        replacer := param.NewReplacer()
        return replacer.ExtractJDBCConfig(jdbcInterface)
 }
@@ -404,7 +404,7 @@ func (jc *JDBCCollector) PreCheck(metrics 
*jobtypes.Metrics) error {
 }
 
 // checkTunnelParam validates SSH tunnel configuration
-func (jc *JDBCCollector) checkTunnelParam(config *protocol.SSHTunnel) error {
+func (jc *JDBCCollector) checkTunnelParam(config *protocol2.SSHTunnel) error {
        if config == nil {
                return nil
        }
@@ -610,7 +610,7 @@ func validateURL(rawURL string, platform string) error {
 }
 
 // constructDatabaseURL constructs the database connection URL/DSN
-func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol.JDBCProtocol, 
host, port string) (string, error) {
+func (jc *JDBCCollector) constructDatabaseURL(jdbc *protocol2.JDBCProtocol, 
host, port string) (string, error) {
        // 1. If user provided a full URL, use it (already validated in 
PreCheck)
        if jdbc.URL != "" {
                // Validate again to prevent bypassing PreCheck
diff --git a/internal/collector/basic/database/jdbc_collector_test.go 
b/internal/protocol/basic/database/jdbc_collector_test.go
similarity index 100%
rename from internal/collector/basic/database/jdbc_collector_test.go
rename to internal/protocol/basic/database/jdbc_collector_test.go
diff --git a/internal/collector/basic/imap/.keep 
b/internal/protocol/basic/dns/.keep
similarity index 100%
rename from internal/collector/basic/imap/.keep
rename to internal/protocol/basic/dns/.keep
diff --git a/internal/collector/basic/ipmi2/.keep 
b/internal/protocol/basic/ftp/.keep
similarity index 100%
rename from internal/collector/basic/ipmi2/.keep
rename to internal/protocol/basic/ftp/.keep
diff --git a/internal/protocol/basic/http/http_collector.go 
b/internal/protocol/basic/http/http_collector.go
new file mode 100644
index 0000000..11a8b9b
--- /dev/null
+++ b/internal/protocol/basic/http/http_collector.go
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package http
+
+import (
+       "bytes"
+       "crypto/md5"
+       "crypto/rand"
+       "crypto/tls"
+       "encoding/hex"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/prometheus/common/expfmt"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+func init() {
+       strategy.RegisterFactory(ProtocolHTTP, func(logger logger.Logger) 
strategy.Collector {
+               return NewHTTPCollector(logger)
+       })
+}
+
+const (
+       ProtocolHTTP = "http"
+
+       // Parse Types
+       ParseTypeDefault    = "default"
+       ParseTypeJsonPath   = "jsonPath"
+       ParseTypePrometheus = "prometheus"
+       ParseTypeWebsite    = "website"
+       ParseTypeHeader     = "header"
+
+       // Auth Types
+       AuthTypeBasic  = "Basic Auth"
+       AuthTypeBearer = "Bearer Token"
+       AuthTypeDigest = "Digest Auth"
+)
+
+type HTTPCollector struct {
+       logger logger.Logger
+}
+
+func NewHTTPCollector(log logger.Logger) *HTTPCollector {
+       return &HTTPCollector{
+               logger: log.WithName("http-collector"),
+       }
+}
+
+func (hc *HTTPCollector) Protocol() string {
+       return ProtocolHTTP
+}
+
+func (hc *HTTPCollector) PreCheck(metrics *job.Metrics) error {
+       if metrics == nil || metrics.HTTP == nil {
+               return fmt.Errorf("http configuration is missing")
+       }
+       return nil
+}
+
+func (hc *HTTPCollector) Collect(metrics *job.Metrics) 
*job.CollectRepMetricsData {
+       start := time.Now()
+
+       // MetricsCollector has already performed parameter replacement on 
metrics.HTTP
+       httpConfig := metrics.HTTP
+
+       // 1. Prepare URL
+       targetURL := httpConfig.URL
+
+       // Parse SSL bool string
+       isSSL := false
+       if httpConfig.SSL != "" {
+               if val, err := strconv.ParseBool(httpConfig.SSL); err == nil {
+                       isSSL = val
+               }
+       }
+
+       if targetURL == "" {
+               schema := "http"
+               if isSSL {
+                       schema = "https"
+               }
+               // Use metrics.ConfigMap values if URL is empty
+               // Note: metrics.ConfigMap is already populated with 
job.Configmap values
+               host := metrics.ConfigMap["host"]
+               port := metrics.ConfigMap["port"]
+               if host != "" && port != "" {
+                       targetURL = fmt.Sprintf("%s://%s:%s", schema, host, 
port)
+               }
+       }
+
+       if targetURL != "" && !strings.HasPrefix(targetURL, "http") {
+               schema := "http"
+               if isSSL {
+                       schema = "https"
+               }
+               targetURL = fmt.Sprintf("%s://%s", schema, targetURL)
+       }
+
+       if targetURL == "" {
+               return hc.createFailResponse(metrics, constants.CollectFail, 
"target URL is empty")
+       }
+
+       // 2. Create Request
+       req, err := hc.createRequest(httpConfig, targetURL)
+       if err != nil {
+               return hc.createFailResponse(metrics, constants.CollectFail, 
fmt.Sprintf("failed to create request: %v", err))
+       }
+
+       // 3. Create Client
+       timeoutStr := metrics.Timeout
+       if httpConfig.Timeout != "" {
+               timeoutStr = httpConfig.Timeout
+       }
+
+       timeout := hc.getTimeout(timeoutStr)
+       client := &http.Client{
+               Timeout: timeout,
+               Transport: &http.Transport{
+                       TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
+               },
+       }
+
+       // 4. Execute Request (with Digest retry logic)
+       resp, err := client.Do(req)
+
+       // Handle Digest Auth Challenge (401)
+       if err == nil && resp.StatusCode == http.StatusUnauthorized &&
+               httpConfig.Authorization != nil &&
+               strings.EqualFold(httpConfig.Authorization.Type, 
AuthTypeDigest) {
+
+               // Close the first response body
+               io.Copy(io.Discard, resp.Body)
+               resp.Body.Close()
+
+               authHeader := resp.Header.Get("WWW-Authenticate")
+               if authHeader != "" {
+                       // Create new request with Authorization header
+                       digestReq, digestErr := hc.handleDigestAuth(req, 
authHeader, httpConfig.Authorization)
+                       if digestErr == nil {
+                               // Retry request
+                               resp, err = client.Do(digestReq)
+                       } else {
+                               hc.logger.Error(digestErr, "failed to handle 
digest auth")
+                       }
+               }
+       }
+
+       if err != nil {
+               return hc.createFailResponse(metrics, 
constants.CollectUnReachable, fmt.Sprintf("request failed: %v", err))
+       }
+       defer resp.Body.Close()
+
+       responseTime := time.Since(start).Milliseconds()
+
+       // 5. Parse Response
+       parseType := httpConfig.ParseType
+       if parseType == "" {
+               parseType = ParseTypeDefault
+       }
+
+       var responseData *job.CollectRepMetricsData
+
+       // Read body
+       bodyBytes, err := io.ReadAll(resp.Body)
+       if err != nil {
+               return hc.createFailResponse(metrics, constants.CollectFail, 
fmt.Sprintf("failed to read body: %v", err))
+       }
+
+       switch parseType {
+       case ParseTypePrometheus:
+               responseData, err = hc.parsePrometheus(bodyBytes, metrics)
+       case ParseTypeWebsite:
+               responseData, err = hc.parseWebsite(bodyBytes, resp.StatusCode, 
responseTime, metrics, httpConfig)
+       case ParseTypeHeader:
+               responseData, err = hc.parseHeader(resp.Header, metrics)
+       case ParseTypeJsonPath, ParseTypeDefault:
+               responseData, err = hc.parseJsonPath(bodyBytes, metrics, 
responseTime, httpConfig)
+       default:
+               responseData, err = hc.parseJsonPath(bodyBytes, metrics, 
responseTime, httpConfig)
+       }
+
+       if err != nil {
+               hc.logger.Error(err, "parse response failed", "type", parseType)
+               return hc.createFailResponse(metrics, constants.CollectFail, 
fmt.Sprintf("parse error: %v", err))
+       }
+
+       hc.fillCommonFields(responseData, metrics.AliasFields, responseTime, 
resp.StatusCode)
+
+       return responseData
+}
+
+// handleDigestAuth generates a new request with the Digest Authorization 
header
+func (hc *HTTPCollector) handleDigestAuth(originalReq *http.Request, 
authHeader string, authConfig *protocol.Authorization) (*http.Request, error) {
+       params := hc.parseAuthHeader(authHeader)
+       realm := params["realm"]
+       nonce := params["nonce"]
+       qop := params["qop"]
+       opaque := params["opaque"]
+       algorithm := params["algorithm"]
+       if algorithm == "" {
+               algorithm = "MD5"
+       }
+
+       if realm == "" || nonce == "" {
+               return nil, fmt.Errorf("missing realm or nonce in digest 
header")
+       }
+
+       username := authConfig.DigestAuthUsername
+       password := authConfig.DigestAuthPassword
+       ha1 := hc.md5Hex(fmt.Sprintf("%s:%s:%s", username, realm, password))
+
+       method := originalReq.Method
+       uri := originalReq.URL.RequestURI()
+       ha2 := hc.md5Hex(fmt.Sprintf("%s:%s", method, uri))
+
+       nc := "00000001"
+       cnonce := hc.generateCnonce()
+
+       var responseStr string
+       if qop == "" {
+               responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s", ha1, nonce, 
ha2))
+       } else if strings.Contains(qop, "auth") {
+               responseStr = hc.md5Hex(fmt.Sprintf("%s:%s:%s:%s:%s:%s", ha1, 
nonce, nc, cnonce, "auth", ha2))
+       } else {
+               return nil, fmt.Errorf("unsupported qop: %s", qop)
+       }
+
+       authVal := fmt.Sprintf(`Digest username="%s", realm="%s", nonce="%s", 
uri="%s", response="%s", algorithm="%s"`,
+               username, realm, nonce, uri, responseStr, algorithm)
+
+       if opaque != "" {
+               authVal += fmt.Sprintf(`, opaque="%s"`, opaque)
+       }
+       if qop != "" {
+               authVal += fmt.Sprintf(`, qop=auth, nc=%s, cnonce="%s"`, nc, 
cnonce)
+       }
+
+       var newReq *http.Request
+       if originalReq.GetBody != nil {
+               body, _ := originalReq.GetBody()
+               newReq, _ = http.NewRequest(originalReq.Method, 
originalReq.URL.String(), body)
+       } else {
+               newReq, _ = http.NewRequest(originalReq.Method, 
originalReq.URL.String(), nil)
+       }
+
+       for k, v := range originalReq.Header {
+               newReq.Header[k] = v
+       }
+
+       newReq.Header.Set("Authorization", authVal)
+       return newReq, nil
+}
+
+func (hc *HTTPCollector) parseAuthHeader(header string) map[string]string {
+       header = strings.TrimPrefix(header, "Digest ")
+       result := make(map[string]string)
+
+       pairs := strings.Split(header, ",")
+       for _, pair := range pairs {
+               parts := strings.SplitN(strings.TrimSpace(pair), "=", 2)
+               if len(parts) == 2 {
+                       key := strings.TrimSpace(parts[0])
+                       val := strings.TrimSpace(parts[1])
+                       val = strings.Trim(val, "\"")
+                       result[key] = val
+               }
+       }
+       return result
+}
+
+func (hc *HTTPCollector) md5Hex(s string) string {
+       hash := md5.Sum([]byte(s))
+       return hex.EncodeToString(hash[:])
+}
+
+func (hc *HTTPCollector) generateCnonce() string {
+       b := make([]byte, 8)
+       io.ReadFull(rand.Reader, b)
+       return hex.EncodeToString(b)
+}
+
+func (hc *HTTPCollector) createRequest(config *protocol.HTTPProtocol, 
targetURL string) (*http.Request, error) {
+       method := strings.ToUpper(config.Method)
+       if method == "" {
+               method = "GET"
+       }
+
+       var body io.Reader
+       if config.Body != "" {
+               body = strings.NewReader(config.Body)
+       }
+
+       req, err := http.NewRequest(method, targetURL, body)
+       if err != nil {
+               return nil, err
+       }
+
+       for k, v := range config.Headers {
+               req.Header.Set(k, v)
+       }
+
+       if config.Authorization != nil {
+               auth := config.Authorization
+               if auth.Type == "" || strings.EqualFold(auth.Type, 
AuthTypeBasic) {
+                       if auth.BasicAuthUsername != "" && 
auth.BasicAuthPassword != "" {
+                               req.SetBasicAuth(auth.BasicAuthUsername, 
auth.BasicAuthPassword)
+                       }
+               } else if strings.EqualFold(auth.Type, AuthTypeBearer) {
+                       if auth.BearerTokenToken != "" {
+                               req.Header.Set("Authorization", "Bearer 
"+auth.BearerTokenToken)
+                       }
+               }
+       }
+
+       if len(config.Params) > 0 {
+               q := req.URL.Query()
+               for k, v := range config.Params {
+                       q.Add(k, v)
+               }
+               req.URL.RawQuery = q.Encode()
+       }
+
+       return req, nil
+}
+
+// --- Parsers ---
+
+func (hc *HTTPCollector) parsePrometheus(body []byte, metrics *job.Metrics) 
(*job.CollectRepMetricsData, error) {
+       response := hc.createSuccessResponse(metrics)
+       var parser expfmt.TextParser
+       metricFamilies, err := 
parser.TextToMetricFamilies(bytes.NewReader(body))
+       if err != nil {
+               return nil, err
+       }
+       now := time.Now().UnixMilli()
+       for _, mf := range metricFamilies {
+               for _, m := range mf.Metric {
+                       row := job.ValueRow{Columns: make([]string, 
len(metrics.AliasFields))}
+                       labels := make(map[string]string)
+                       for _, pair := range m.Label {
+                               labels[pair.GetName()] = pair.GetValue()
+                       }
+                       hasValue := false
+                       for i, field := range metrics.AliasFields {
+                               if field == constants.NULL_VALUE {
+                                       continue
+                               }
+                               if field == "value" || field == "prom_value" {
+                                       if m.Gauge != nil {
+                                               row.Columns[i] = 
fmt.Sprintf("%f", m.Gauge.GetValue())
+                                       } else if m.Counter != nil {
+                                               row.Columns[i] = 
fmt.Sprintf("%f", m.Counter.GetValue())
+                                       } else if m.Untyped != nil {
+                                               row.Columns[i] = 
fmt.Sprintf("%f", m.Untyped.GetValue())
+                                       }
+                                       hasValue = true
+                               } else {
+                                       if val, ok := labels[field]; ok {
+                                               row.Columns[i] = val
+                                               hasValue = true
+                                       } else {
+                                               row.Columns[i] = 
constants.NULL_VALUE
+                                       }
+                               }
+                       }
+                       if hasValue {
+                               response.Values = append(response.Values, row)
+                       }
+               }
+       }
+       response.Time = now
+       return response, nil
+}
+
+func (hc *HTTPCollector) parseWebsite(body []byte, statusCode int, 
responseTime int64, metrics *job.Metrics, httpConfig *protocol.HTTPProtocol) 
(*job.CollectRepMetricsData, error) {
+       response := hc.createSuccessResponse(metrics)
+       row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
+       keyword := httpConfig.Keyword
+       keywordCount := 0
+       if keyword != "" {
+               keywordCount = strings.Count(string(body), keyword)
+       }
+       for i, field := range metrics.AliasFields {
+               switch strings.ToLower(field) {
+               case strings.ToLower(constants.StatusCode):
+                       row.Columns[i] = strconv.Itoa(statusCode)
+               case strings.ToLower(constants.RESPONSE_TIME):
+                       row.Columns[i] = strconv.FormatInt(responseTime, 10)
+               case "keyword":
+                       row.Columns[i] = strconv.Itoa(keywordCount)
+               default:
+                       row.Columns[i] = constants.NULL_VALUE
+               }
+       }
+       response.Values = append(response.Values, row)
+       return response, nil
+}
+
+func (hc *HTTPCollector) parseHeader(header http.Header, metrics *job.Metrics) 
(*job.CollectRepMetricsData, error) {
+       response := hc.createSuccessResponse(metrics)
+       row := job.ValueRow{Columns: make([]string, len(metrics.AliasFields))}
+       for i, field := range metrics.AliasFields {
+               val := header.Get(field)
+               if val != "" {
+                       row.Columns[i] = val
+               } else {
+                       row.Columns[i] = constants.NULL_VALUE
+               }
+       }
+       response.Values = append(response.Values, row)
+       return response, nil
+}
+
+func (hc *HTTPCollector) parseJsonPath(body []byte, metrics *job.Metrics, 
responseTime int64, httpConfig *protocol.HTTPProtocol) 
(*job.CollectRepMetricsData, error) {
+       response := hc.createSuccessResponse(metrics)
+       var data interface{}
+       decoder := json.NewDecoder(bytes.NewReader(body))
+       decoder.UseNumber()
+       if err := decoder.Decode(&data); err != nil {
+               return response, nil
+       }
+       parseScript := httpConfig.ParseScript
+       root := data
+       if parseScript != "" && parseScript != "$" {
+               root = hc.navigateJson(data, parseScript)
+       }
+       if root == nil {
+               return response, nil
+       }
+       if arr, ok := root.([]interface{}); ok {
+               for _, item := range arr {
+                       row := hc.extractRow(item, metrics.AliasFields)
+                       response.Values = append(response.Values, row)
+               }
+       } else {
+               row := hc.extractRow(root, metrics.AliasFields)
+               response.Values = append(response.Values, row)
+       }
+       return response, nil
+}
+
+func (hc *HTTPCollector) navigateJson(data interface{}, path string) 
interface{} {
+       path = strings.TrimPrefix(path, "$.")
+       if path == "" {
+               return data
+       }
+       parts := strings.Split(path, ".")
+       current := data
+       for _, part := range parts {
+               if current == nil {
+                       return nil
+               }
+               key := part
+               idx := -1
+               if i := strings.Index(part, "["); i > -1 && 
strings.HasSuffix(part, "]") {
+                       key = part[:i]
+                       idxStr := part[i+1 : len(part)-1]
+                       if val, err := strconv.Atoi(idxStr); err == nil {
+                               idx = val
+                       }
+               }
+               if key != "" {
+                       if m, ok := current.(map[string]interface{}); ok {
+                               if val, exists := m[key]; exists {
+                                       current = val
+                               } else {
+                                       return nil
+                               }
+                       } else {
+                               return nil
+                       }
+               }
+               if idx > -1 {
+                       if arr, ok := current.([]interface{}); ok {
+                               if idx < len(arr) {
+                                       current = arr[idx]
+                               } else {
+                                       return nil
+                               }
+                       } else {
+                               return nil
+                       }
+               }
+       }
+       return current
+}
+
+func (hc *HTTPCollector) extractRow(item interface{}, fields []string) 
job.ValueRow {
+       row := job.ValueRow{Columns: make([]string, len(fields))}
+       m, isMap := item.(map[string]interface{})
+       for i, field := range fields {
+               if strings.EqualFold(field, constants.RESPONSE_TIME) || 
strings.EqualFold(field, constants.StatusCode) {
+                       row.Columns[i] = constants.NULL_VALUE
+                       continue
+               }
+               var val interface{}
+               if isMap {
+                       if v, ok := m[field]; ok {
+                               val = v
+                       } else {
+                               val = hc.navigateJson(item, field)
+                       }
+               }
+               if val != nil {
+                       row.Columns[i] = fmt.Sprintf("%v", val)
+               } else {
+                       row.Columns[i] = constants.NULL_VALUE
+               }
+       }
+       return row
+}
+
+func (hc *HTTPCollector) fillCommonFields(resp *job.CollectRepMetricsData, 
fields []string, responseTime int64, statusCode int) {
+       if resp == nil {
+               return
+       }
+       if len(resp.Fields) == 0 {
+               resp.Fields = make([]job.Field, len(fields))
+               for i, f := range fields {
+                       resp.Fields[i] = job.Field{Field: f, Type: 
constants.TYPE_STRING}
+               }
+       }
+       if len(resp.Values) == 0 {
+               row := job.ValueRow{Columns: make([]string, len(fields))}
+               for k := range row.Columns {
+                       row.Columns[k] = constants.NULL_VALUE
+               }
+               resp.Values = append(resp.Values, row)
+       }
+       for i := range resp.Values {
+               for j, field := range fields {
+                       if strings.EqualFold(field, constants.RESPONSE_TIME) {
+                               resp.Values[i].Columns[j] = 
strconv.FormatInt(responseTime, 10)
+                       }
+                       if strings.EqualFold(field, constants.StatusCode) {
+                               resp.Values[i].Columns[j] = 
strconv.Itoa(statusCode)
+                       }
+               }
+       }
+}
+
+func (hc *HTTPCollector) getTimeout(timeoutStr string) time.Duration {
+       if timeoutStr == "" {
+               return 10 * time.Second
+       }
+       if val, err := strconv.Atoi(timeoutStr); err == nil {
+               return time.Duration(val) * time.Millisecond
+       }
+       return 10 * time.Second
+}
+
+func (hc *HTTPCollector) createSuccessResponse(metrics *job.Metrics) 
*job.CollectRepMetricsData {
+       return &job.CollectRepMetricsData{
+               Metrics: metrics.Name,
+               Time:    time.Now().UnixMilli(),
+               Code:    constants.CollectSuccess,
+               Msg:     "success",
+               Values:  make([]job.ValueRow, 0),
+       }
+}
+
+func (hc *HTTPCollector) createFailResponse(metrics *job.Metrics, code int, 
msg string) *job.CollectRepMetricsData {
+       return &job.CollectRepMetricsData{
+               Metrics: metrics.Name,
+               Time:    time.Now().UnixMilli(),
+               Code:    code,
+               Msg:     msg,
+       }
+}
diff --git a/internal/collector/basic/memcached/.keep 
b/internal/protocol/basic/icmp/.keep
similarity index 100%
rename from internal/collector/basic/memcached/.keep
rename to internal/protocol/basic/icmp/.keep
diff --git a/internal/collector/basic/modbus/.keep 
b/internal/protocol/basic/imap/.keep
similarity index 100%
rename from internal/collector/basic/modbus/.keep
rename to internal/protocol/basic/imap/.keep
diff --git a/internal/collector/basic/mqtt/.keep 
b/internal/protocol/basic/ipmi2/.keep
similarity index 100%
rename from internal/collector/basic/mqtt/.keep
rename to internal/protocol/basic/ipmi2/.keep
diff --git a/internal/collector/basic/nginx/.keep 
b/internal/protocol/basic/memcached/.keep
similarity index 100%
rename from internal/collector/basic/nginx/.keep
rename to internal/protocol/basic/memcached/.keep
diff --git a/internal/collector/basic/ntp/.keep 
b/internal/protocol/basic/modbus/.keep
similarity index 100%
rename from internal/collector/basic/ntp/.keep
rename to internal/protocol/basic/modbus/.keep
diff --git a/internal/collector/basic/plc/.keep 
b/internal/protocol/basic/mqtt/.keep
similarity index 100%
rename from internal/collector/basic/plc/.keep
rename to internal/protocol/basic/mqtt/.keep
diff --git a/internal/collector/basic/pop3/.keep 
b/internal/protocol/basic/nginx/.keep
similarity index 100%
rename from internal/collector/basic/pop3/.keep
rename to internal/protocol/basic/nginx/.keep
diff --git a/internal/collector/basic/prometheus/.keep 
b/internal/protocol/basic/ntp/.keep
similarity index 100%
rename from internal/collector/basic/prometheus/.keep
rename to internal/protocol/basic/ntp/.keep
diff --git a/internal/collector/basic/push/.keep 
b/internal/protocol/basic/plc/.keep
similarity index 100%
rename from internal/collector/basic/push/.keep
rename to internal/protocol/basic/plc/.keep
diff --git a/internal/collector/basic/redfish/.keep 
b/internal/protocol/basic/pop3/.keep
similarity index 100%
rename from internal/collector/basic/redfish/.keep
rename to internal/protocol/basic/pop3/.keep
diff --git a/internal/collector/basic/s7/.keep 
b/internal/protocol/basic/prometheus/.keep
similarity index 100%
rename from internal/collector/basic/s7/.keep
rename to internal/protocol/basic/prometheus/.keep
diff --git a/internal/collector/basic/script/.keep 
b/internal/protocol/basic/push/.keep
similarity index 100%
rename from internal/collector/basic/script/.keep
rename to internal/protocol/basic/push/.keep
diff --git a/internal/collector/basic/sd/.keep 
b/internal/protocol/basic/redfish/.keep
similarity index 100%
rename from internal/collector/basic/sd/.keep
rename to internal/protocol/basic/redfish/.keep
diff --git a/internal/collector/basic/redis/redis_collector.go 
b/internal/protocol/basic/redis/redis_collector.go
similarity index 93%
rename from internal/collector/basic/redis/redis_collector.go
rename to internal/protocol/basic/redis/redis_collector.go
index 9b23741..8b592f4 100644
--- a/internal/collector/basic/redis/redis_collector.go
+++ b/internal/protocol/basic/redis/redis_collector.go
@@ -30,10 +30,10 @@ import (
 
        "github.com/redis/go-redis/v9"
        "golang.org/x/crypto/ssh"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       protocol2 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
        consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        sshhelper 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
@@ -143,7 +143,7 @@ func (rc *RedisCollector) Collect(metrics 
*jobtypes.Metrics) *jobtypes.CollectRe
        return response
 }
 
-func (rc *RedisCollector) collectSingle(ctx context.Context, metrics 
*jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, 
string, string) (net.Conn, error), timeout time.Duration, response 
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+func (rc *RedisCollector) collectSingle(ctx context.Context, metrics 
*jobtypes.Metrics, config *protocol2.RedisProtocol, dialer 
func(context.Context, string, string) (net.Conn, error), timeout time.Duration, 
response *jobtypes.CollectRepMetricsData, startTime time.Time) {
        opts := &redis.Options{
                Addr:        fmt.Sprintf("%s:%s", config.Host, config.Port),
                Username:    config.Username,
@@ -174,7 +174,7 @@ func (rc *RedisCollector) collectSingle(ctx 
context.Context, metrics *jobtypes.M
        rc.addValueRow(response, metrics.AliasFields, parseMap)
 }
 
-func (rc *RedisCollector) collectCluster(ctx context.Context, metrics 
*jobtypes.Metrics, config *protocol.RedisProtocol, dialer func(context.Context, 
string, string) (net.Conn, error), timeout time.Duration, response 
*jobtypes.CollectRepMetricsData, startTime time.Time) {
+func (rc *RedisCollector) collectCluster(ctx context.Context, metrics 
*jobtypes.Metrics, config *protocol2.RedisProtocol, dialer 
func(context.Context, string, string) (net.Conn, error), timeout time.Duration, 
response *jobtypes.CollectRepMetricsData, startTime time.Time) {
        opts := &redis.ClusterOptions{
                Addrs:       []string{fmt.Sprintf("%s:%s", config.Host, 
config.Port)},
                Username:    config.Username,
@@ -301,13 +301,13 @@ func (rc *RedisCollector) parseInfo(info string) 
map[string]string {
 }
 
 // createRedisDialer creates a dialer function that supports SSH tunneling
-func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol.SSHTunnel) 
(func(context.Context, string, string) (net.Conn, error), error) {
+func (rc *RedisCollector) createRedisDialer(sshTunnel *protocol2.SSHTunnel) 
(func(context.Context, string, string) (net.Conn, error), error) {
        if sshTunnel == nil || sshTunnel.Enable != "true" {
                return nil, nil
        }
 
        // Create SSH config
-       sshConfig := &protocol.SSHProtocol{
+       sshConfig := &protocol2.SSHProtocol{
                Host:     sshTunnel.Host,
                Port:     sshTunnel.Port,
                Username: sshTunnel.Username,
diff --git a/internal/collector/basic/redis/redis_collector_test.go 
b/internal/protocol/basic/redis/redis_collector_test.go
similarity index 97%
rename from internal/collector/basic/redis/redis_collector_test.go
rename to internal/protocol/basic/redis/redis_collector_test.go
index 56f7b2c..d611a32 100644
--- a/internal/collector/basic/redis/redis_collector_test.go
+++ b/internal/protocol/basic/redis/redis_collector_test.go
@@ -26,10 +26,10 @@ import (
 
        "github.com/alicebob/miniredis/v2"
        "github.com/stretchr/testify/assert"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
        consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/basic/smtp/.keep 
b/internal/protocol/basic/s7/.keep
similarity index 100%
rename from internal/collector/basic/smtp/.keep
rename to internal/protocol/basic/s7/.keep
diff --git a/internal/collector/basic/snmp/.keep 
b/internal/protocol/basic/script/.keep
similarity index 100%
rename from internal/collector/basic/snmp/.keep
rename to internal/protocol/basic/script/.keep
diff --git a/internal/collector/basic/telnet/.keep 
b/internal/protocol/basic/sd/.keep
similarity index 100%
rename from internal/collector/basic/telnet/.keep
rename to internal/protocol/basic/sd/.keep
diff --git a/internal/collector/basic/udp/.keep 
b/internal/protocol/basic/smtp/.keep
similarity index 100%
rename from internal/collector/basic/udp/.keep
rename to internal/protocol/basic/smtp/.keep
diff --git a/internal/collector/basic/websocket/.keep 
b/internal/protocol/basic/snmp/.keep
similarity index 100%
rename from internal/collector/basic/websocket/.keep
rename to internal/protocol/basic/snmp/.keep
diff --git a/internal/collector/basic/ssh/ssh_collector.go 
b/internal/protocol/basic/ssh/ssh_collector.go
similarity index 98%
rename from internal/collector/basic/ssh/ssh_collector.go
rename to internal/protocol/basic/ssh/ssh_collector.go
index a611ded..fcf2cd5 100644
--- a/internal/collector/basic/ssh/ssh_collector.go
+++ b/internal/protocol/basic/ssh/ssh_collector.go
@@ -26,10 +26,10 @@ import (
        "strings"
        "time"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
        consts "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
        sshhelper 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/util/ssh"
diff --git a/internal/collector/common/cache/.keep 
b/internal/protocol/basic/telnet/.keep
similarity index 100%
rename from internal/collector/common/cache/.keep
rename to internal/protocol/basic/telnet/.keep
diff --git a/internal/collector/common/dispatcher/entrance/.keep 
b/internal/protocol/basic/udp/.keep
similarity index 100%
rename from internal/collector/common/dispatcher/entrance/.keep
rename to internal/protocol/basic/udp/.keep
diff --git a/internal/collector/common/dispatcher/exporter/.keep 
b/internal/protocol/basic/websocket/.keep
similarity index 100%
rename from internal/collector/common/dispatcher/exporter/.keep
rename to internal/protocol/basic/websocket/.keep
diff --git a/internal/collector/extension/kafka/.keep 
b/internal/protocol/extension/kafka/.keep
similarity index 100%
rename from internal/collector/extension/kafka/.keep
rename to internal/protocol/extension/kafka/.keep
diff --git a/internal/collector/extension/milvus/milvus_collector.go 
b/internal/protocol/extension/milvus/milvus_collector.go
similarity index 96%
rename from internal/collector/extension/milvus/milvus_collector.go
rename to internal/protocol/extension/milvus/milvus_collector.go
index 6f69317..8935ff1 100644
--- a/internal/collector/extension/milvus/milvus_collector.go
+++ b/internal/protocol/extension/milvus/milvus_collector.go
@@ -27,9 +27,9 @@ import (
        "time"
 
        "github.com/milvus-io/milvus-sdk-go/v2/client"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect/strategy"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/constants"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/job/collect/strategy"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/extension/milvus/milvus_collector_test.go 
b/internal/protocol/extension/milvus/milvus_collector_test.go
similarity index 51%
rename from internal/collector/extension/milvus/milvus_collector_test.go
rename to internal/protocol/extension/milvus/milvus_collector_test.go
index 137d4c3..7aa5433 100644
--- a/internal/collector/extension/milvus/milvus_collector_test.go
+++ b/internal/protocol/extension/milvus/milvus_collector_test.go
@@ -20,32 +20,32 @@
 package milvus
 
 import (
-  "os"
-  "testing"
+       "os"
+       "testing"
 
-  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
-  loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
-  "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
 func TestMilvusCollector_Collect(t *testing.T) {
-  metrics := &job.Metrics{
-    Name: "milvus_test",
-    Milvus: &protocol.MilvusProtocol{
-      Host: "localhost",
-      Port: "19530",
-    },
-    AliasFields: []string{"version", "responseTime", "host", "port"},
-  }
+       metrics := &job.Metrics{
+               Name: "milvus_test",
+               Milvus: &protocol.MilvusProtocol{
+                       Host: "localhost",
+                       Port: "19530",
+               },
+               AliasFields: []string{"version", "responseTime", "host", 
"port"},
+       }
 
-  l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
-  collector := NewMilvusCollector(l)
-  result := collector.Collect(metrics)
+       l := logger.DefaultLogger(os.Stdout, loggertypes.LogLevelInfo)
+       collector := NewMilvusCollector(l)
+       result := collector.Collect(metrics)
 
-  if result.Code != 0 {
-    t.Logf("Collect failed: %s", result.Msg)
-  } else {
-    t.Logf("Collect success: %+v", result.Values)
-  }
+       if result.Code != 0 {
+               t.Logf("Collect failed: %s", result.Msg)
+       } else {
+               t.Logf("Collect success: %+v", result.Values)
+       }
 }
diff --git a/internal/collector/extension/mongodb/.keep 
b/internal/protocol/extension/mongodb/.keep
similarity index 100%
rename from internal/collector/extension/mongodb/.keep
rename to internal/protocol/extension/mongodb/.keep
diff --git a/internal/collector/extension/nebulagraph/.keep 
b/internal/protocol/extension/nebulagraph/.keep
similarity index 100%
rename from internal/collector/extension/nebulagraph/.keep
rename to internal/protocol/extension/nebulagraph/.keep
diff --git a/internal/collector/extension/rocketmq/.keep 
b/internal/protocol/extension/rocketmq/.keep
similarity index 100%
rename from internal/collector/extension/rocketmq/.keep
rename to internal/protocol/extension/rocketmq/.keep
diff --git a/internal/collector/basic/standard/imports.go 
b/internal/protocol/standard/imports.go
similarity index 67%
rename from internal/collector/basic/standard/imports.go
rename to internal/protocol/standard/imports.go
index 5eb3217..ae47d46 100644
--- a/internal/collector/basic/standard/imports.go
+++ b/internal/protocol/standard/imports.go
@@ -20,11 +20,11 @@
 package standard
 
 import (
-       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/database"
-       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/http"
-       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/redis"
-       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/basic/ssh"
+       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/database"
+       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/http"
+       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/redis"
+       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/basic/ssh"
 
        // extensions
-       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/extension/milvus"
+       _ 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/protocol/extension/milvus"
 )
diff --git a/internal/collector/common/server/server.go 
b/internal/server/server.go
similarity index 88%
rename from internal/collector/common/server/server.go
rename to internal/server/server.go
index 0994f9e..f06a5fe 100644
--- a/internal/collector/common/server/server.go
+++ b/internal/server/server.go
@@ -22,8 +22,8 @@ package server
 import (
        "io"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/config"
-       logger2 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+       logger2 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/collector/common/server/server_test.go 
b/internal/server/server_test.go
similarity index 100%
rename from internal/collector/common/server/server_test.go
rename to internal/server/server_test.go
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index 408a0b5..d969f48 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -25,8 +25,8 @@ import (
        "time"
 
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
diff --git a/internal/transport/transport.go b/internal/transport/transport.go
new file mode 100644
index 0000000..807630a
--- /dev/null
+++ b/internal/transport/transport.go
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package transport
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "os"
+
+       pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+       config2 "hertzbeat.apache.org/hertzbeat-collector-go/internal/config"
+       clrserver "hertzbeat.apache.org/hertzbeat-collector-go/internal/server"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/collector"
+       configtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/config"
+       loggertypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+const (
+       // DefaultManagerAddr is the default manager server address (Java Netty 
default port)
+       DefaultManagerAddr = "127.0.0.1:1158"
+       // DefaultProtocol is the default communication protocol for Java 
compatibility
+       DefaultProtocol = "netty"
+       // DefaultMode is the default operation mode
+       DefaultMode = "public"
+       // DefaultIdentity is the default collector identity
+       DefaultIdentity = "collector-go"
+)
+
+type Runner struct {
+       Config       *configtypes.CollectorConfig
+       client       TransportClient
+       jobScheduler JobScheduler
+       clrserver.Server
+}
+
+func New(cfg *configtypes.CollectorConfig) *Runner {
+       return &Runner{
+               Config: cfg,
+               Server: clrserver.Server{
+                       Logger: logger.Logger{}, // Will be initialized 
properly in Start method
+               },
+       }
+}
+
+// SetJobScheduler sets the job scheduler for the transport runner
+func (r *Runner) SetJobScheduler(scheduler JobScheduler) {
+       r.jobScheduler = scheduler
+}
+
+// NewFromConfig creates a new transport runner from collector configuration
+func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
+       if cfg == nil {
+               return nil
+       }
+       return New(cfg)
+}
+
+// NewFromEnv creates a new transport runner from environment variables
+func NewFromEnv() *Runner {
+       envLoader := config2.NewEnvConfigLoader()
+       cfg := envLoader.LoadFromEnv()
+       return NewFromConfig(cfg)
+}
+
+// NewFromUnifiedConfig creates a new transport runner using unified 
configuration loading
+// It loads from file first, then overrides with environment variables
+func NewFromUnifiedConfig(cfgPath string) (*Runner, error) {
+       unifiedLoader := config2.NewUnifiedConfigLoader(cfgPath)
+       cfg, err := unifiedLoader.Load()
+       if err != nil {
+               return nil, err
+       }
+       return NewFromConfig(cfg), nil
+}
+
+func (r *Runner) Start(ctx context.Context) error {
+       // 初始化 Logger 如果它还没有被设置
+       if r.Logger.IsZero() {
+               r.Logger = logger.DefaultLogger(os.Stdout, 
loggertypes.LogLevelInfo)
+       }
+       r.Logger = r.Logger.WithName(r.Info().Name).WithValues("runner", 
r.Info().Name)
+       r.Logger.Info("Starting transport client")
+
+       // 构建 server 地址
+       addr := fmt.Sprintf("%s:%s", r.Config.Collector.Manager.Host, 
r.Config.Collector.Manager.Port)
+       if addr == ":" {
+               // 如果配置为空,使用环境变量或默认值
+               if v := os.Getenv("MANAGER_ADDR"); v != "" {
+                       addr = v
+               } else {
+                       addr = DefaultManagerAddr
+               }
+       }
+
+       // 确定协议
+       protocol := r.Config.Collector.Manager.Protocol
+       if protocol == "" {
+               if v := os.Getenv("MANAGER_PROTOCOL"); v != "" {
+                       protocol = v
+               } else {
+                       protocol = DefaultProtocol
+               }
+       }
+
+       r.Logger.Info("Connecting to manager server", "addr", addr, "protocol", 
protocol)
+
+       // 创建客户端
+       factory := &TransportClientFactory{}
+       client, err := factory.CreateClient(protocol, addr)
+       if err != nil {
+               r.Logger.Error(err, "Failed to create transport client")
+               return err
+       }
+
+       // Set the identity on the client if it supports it
+       identity := r.Config.Collector.Identity
+       if identity == "" {
+               identity = DefaultIdentity
+       }
+
+       if nettyClient, ok := client.(*NettyClient); ok {
+               nettyClient.SetIdentity(identity)
+       }
+
+       r.client = client
+
+       // 设置事件处理器
+       switch c := client.(type) {
+       case *GrpcClient:
+               c.SetEventHandler(func(event Event) {
+                       switch event.Type {
+                       case EventConnected:
+                               r.Logger.Info("Connected to manager gRPC 
server", "addr", event.Address)
+                               go r.sendOnlineMessage()
+                       case EventDisconnected:
+                               r.Logger.Info("Disconnected from manager gRPC 
server", "addr", event.Address)
+                       case EventConnectFailed:
+                               r.Logger.Error(event.Error, "Failed to connect 
to manager gRPC server", "addr", event.Address)
+                       }
+               })
+               // Register processors with job scheduler
+               if r.jobScheduler != nil {
+                       RegisterDefaultProcessors(c, r.jobScheduler)
+                       r.Logger.Info("Registered gRPC processors with job 
scheduler")
+               } else {
+                       RegisterDefaultProcessors(c, nil)
+                       r.Logger.Info("Registered gRPC processors without job 
scheduler")
+               }
+       case *NettyClient:
+               c.SetEventHandler(func(event Event) {
+                       switch event.Type {
+                       case EventConnected:
+                               r.Logger.Info("Connected to manager netty 
server", "addr", event.Address)
+                               go r.sendOnlineMessage()
+                       case EventDisconnected:
+                               r.Logger.Info("Disconnected from manager netty 
server", "addr", event.Address)
+                       case EventConnectFailed:
+                               r.Logger.Error(event.Error, "Failed to connect 
to manager netty server", "addr", event.Address)
+                       }
+               })
+               // Register processors with job scheduler
+               if r.jobScheduler != nil {
+                       RegisterDefaultNettyProcessors(c, r.jobScheduler)
+                       r.Logger.Info("Registered netty processors with job 
scheduler")
+               } else {
+                       RegisterDefaultNettyProcessors(c, nil)
+                       r.Logger.Info("Registered netty processors without job 
scheduler")
+               }
+       }
+
+       if err := r.client.Start(); err != nil {
+               r.Logger.Error(err, "Failed to start transport client")
+               return err
+       }
+
+       // 创建新的context用于监控关闭信号
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
+       // 监听 ctx.Done 优雅关闭
+       go func() {
+               <-ctx.Done()
+               r.Logger.Info("Shutting down transport client...")
+               _ = r.client.Shutdown()
+       }()
+
+       // 阻塞直到 ctx.Done
+       <-ctx.Done()
+       return nil
+}
+
+func (r *Runner) sendOnlineMessage() {
+       if r.client != nil && r.client.IsStarted() {
+               // Use the configured identity
+               identity := r.Config.Collector.Identity
+               if identity == "" {
+                       identity = DefaultIdentity
+               }
+
+               // Create CollectorInfo JSON structure as expected by Java 
server
+               mode := r.Config.Collector.Mode
+               if mode == "" {
+                       mode = DefaultMode // Default mode as in Java version
+               }
+
+               collectorInfo := map[string]interface{}{
+                       "name":    identity,
+                       "ip":      "", // Let server detect IP
+                       "version": "1.0.0",
+                       "mode":    mode,
+               }
+
+               // Convert to JSON bytes
+               jsonData, err := json.Marshal(collectorInfo)
+               if err != nil {
+                       r.Logger.Error(err, "Failed to marshal collector info 
to JSON")
+                       return
+               }
+
+               onlineMsg := &pb.Message{
+                       Type:      pb.MessageType_GO_ONLINE,
+                       Direction: pb.Direction_REQUEST,
+                       Identity:  identity,
+                       Msg:       jsonData,
+               }
+
+               r.Logger.Info("Sending online message", "identity", identity, 
"type", onlineMsg.Type)
+
+               if err := r.client.SendMsg(onlineMsg); err != nil {
+                       r.Logger.Error(err, "Failed to send online message", 
"identity", identity)
+               } else {
+                       r.Logger.Info("Online message sent successfully", 
"identity", identity)
+               }
+       }
+}
+
+func (r *Runner) Info() collector.Info {
+       return collector.Info{
+               Name: "transport",
+       }
+}
+
+func (r *Runner) Close() error {
+       r.Logger.Info("transport close...")
+       if r.client != nil {
+               _ = r.client.Shutdown()
+       }
+       return nil
+}
+
+// GetClient returns the transport client (for testing and advanced usage)
+func (r *Runner) GetClient() TransportClient {
+       return r.client
+}
+
+// IsConnected returns whether the client is connected and started
+func (r *Runner) IsConnected() bool {
+       return r.client != nil && r.client.IsStarted()
+}
diff --git a/internal/collector/common/types/collector/collector_types.go 
b/internal/types/collector/collector_types.go
similarity index 100%
rename from internal/collector/common/types/collector/collector_types.go
rename to internal/types/collector/collector_types.go
diff --git a/internal/collector/common/types/config/config_types.go 
b/internal/types/config/config_types.go
similarity index 100%
rename from internal/collector/common/types/config/config_types.go
rename to internal/types/config/config_types.go
diff --git a/internal/collector/common/types/err/error_types.go 
b/internal/types/err/error_types.go
similarity index 100%
rename from internal/collector/common/types/err/error_types.go
rename to internal/types/err/error_types.go
diff --git a/internal/collector/common/types/job/job_types.go 
b/internal/types/job/job_types.go
similarity index 100%
rename from internal/collector/common/types/job/job_types.go
rename to internal/types/job/job_types.go
diff --git a/internal/collector/common/types/job/metrics_types.go 
b/internal/types/job/metrics_types.go
similarity index 88%
rename from internal/collector/common/types/job/metrics_types.go
rename to internal/types/job/metrics_types.go
index ea591f3..207bf07 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/types/job/metrics_types.go
@@ -21,7 +21,7 @@ import (
        "fmt"
        "time"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
+       protocol2 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
 )
 
 // Metrics represents a metric configuration
@@ -46,14 +46,14 @@ type Metrics struct {
        HasSubTask  bool              `json:"hasSubTask"`
 
        // Protocol specific fields
-       HTTP    *protocol.HTTPProtocol    `json:"http,omitempty"`
-       SSH     *protocol.SSHProtocol     `json:"ssh,omitempty"`
-       JDBC    *protocol.JDBCProtocol    `json:"jdbc,omitempty"`
-       SNMP    *protocol.SNMPProtocol    `json:"snmp,omitempty"`
-       JMX     *protocol.JMXProtocol     `json:"jmx,omitempty"`
-       Redis   *protocol.RedisProtocol   `json:"redis,omitempty"`
-       MongoDB *protocol.MongoDBProtocol `json:"mongodb,omitempty"`
-       Milvus  *protocol.MilvusProtocol  `json:"milvus,omitempty"`
+       HTTP    *protocol2.HTTPProtocol    `json:"http,omitempty"`
+       SSH     *protocol2.SSHProtocol     `json:"ssh,omitempty"`
+       JDBC    *protocol2.JDBCProtocol    `json:"jdbc,omitempty"`
+       SNMP    *protocol2.SNMPProtocol    `json:"snmp,omitempty"`
+       JMX     *protocol2.JMXProtocol     `json:"jmx,omitempty"`
+       Redis   *protocol2.RedisProtocol   `json:"redis,omitempty"`
+       MongoDB *protocol2.MongoDBProtocol `json:"mongodb,omitempty"`
+       Milvus  *protocol2.MilvusProtocol  `json:"milvus,omitempty"`
 }
 
 // Field represents a metric field
@@ -218,18 +218,18 @@ type SSHProtocol struct {
 
 // JDBCProtocol represents JDBC protocol configuration
 type JDBCProtocol struct {
-       Host            string              `json:"host"`
-       Port            string              `json:"port"`
-       Platform        string              `json:"platform"`
-       Username        string              `json:"username"`
-       Password        string              `json:"password"`
-       Database        string              `json:"database"`
-       Timeout         string              `json:"timeout"`
-       QueryType       string              `json:"queryType"`
-       SQL             string              `json:"sql"`
-       URL             string              `json:"url"`
-       ReuseConnection string              `json:"reuseConnection"`
-       SSHTunnel       *protocol.SSHTunnel `json:"sshTunnel,omitempty"`
+       Host            string               `json:"host"`
+       Port            string               `json:"port"`
+       Platform        string               `json:"platform"`
+       Username        string               `json:"username"`
+       Password        string               `json:"password"`
+       Database        string               `json:"database"`
+       Timeout         string               `json:"timeout"`
+       QueryType       string               `json:"queryType"`
+       SQL             string               `json:"sql"`
+       URL             string               `json:"url"`
+       ReuseConnection string               `json:"reuseConnection"`
+       SSHTunnel       *protocol2.SSHTunnel `json:"sshTunnel,omitempty"`
 }
 
 // SNMPProtocol represents SNMP protocol configuration
@@ -262,13 +262,13 @@ type JMXProtocol struct {
 
 // RedisProtocol represents Redis protocol configuration
 type RedisProtocol struct {
-       Host      string              `json:"host"`
-       Port      string              `json:"port"`
-       Username  string              `json:"username"`
-       Password  string              `json:"password"`
-       Pattern   string              `json:"pattern"`
-       Timeout   string              `json:"timeout"`
-       SSHTunnel *protocol.SSHTunnel `json:"sshTunnel,omitempty"`
+       Host      string               `json:"host"`
+       Port      string               `json:"port"`
+       Username  string               `json:"username"`
+       Password  string               `json:"password"`
+       Pattern   string               `json:"pattern"`
+       Timeout   string               `json:"timeout"`
+       SSHTunnel *protocol2.SSHTunnel `json:"sshTunnel,omitempty"`
 }
 
 // MongoDBProtocol represents MongoDB protocol configuration
diff --git 
a/internal/collector/common/types/job/protocol/common_request_protocol.go 
b/internal/types/job/protocol/common_request_protocol.go
similarity index 100%
rename from 
internal/collector/common/types/job/protocol/common_request_protocol.go
rename to internal/types/job/protocol/common_request_protocol.go
diff --git a/internal/collector/common/types/job/protocol/consul_sd_protocol.go 
b/internal/types/job/protocol/consul_sd_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/consul_sd_protocol.go
rename to internal/types/job/protocol/consul_sd_protocol.go
diff --git a/internal/collector/common/types/job/protocol/http_protocol.go 
b/internal/types/job/protocol/http_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/http_protocol.go
rename to internal/types/job/protocol/http_protocol.go
diff --git a/internal/collector/common/types/job/protocol/jdbc_protocol.go 
b/internal/types/job/protocol/jdbc_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/jdbc_protocol.go
rename to internal/types/job/protocol/jdbc_protocol.go
diff --git a/internal/collector/common/types/job/protocol/jmx_protocol.go 
b/internal/types/job/protocol/jmx_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/jmx_protocol.go
rename to internal/types/job/protocol/jmx_protocol.go
diff --git a/internal/collector/common/types/job/protocol/milvus_protocol.go 
b/internal/types/job/protocol/milvus_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/milvus_protocol.go
rename to internal/types/job/protocol/milvus_protocol.go
diff --git a/internal/collector/common/types/job/protocol/mongodb_protocol.go 
b/internal/types/job/protocol/mongodb_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/mongodb_protocol.go
rename to internal/types/job/protocol/mongodb_protocol.go
diff --git a/internal/collector/common/types/job/protocol/redis_protocol.go 
b/internal/types/job/protocol/redis_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/redis_protocol.go
rename to internal/types/job/protocol/redis_protocol.go
diff --git a/internal/collector/common/types/job/protocol/snmp_protocol.go 
b/internal/types/job/protocol/snmp_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/snmp_protocol.go
rename to internal/types/job/protocol/snmp_protocol.go
diff --git a/internal/collector/common/types/job/protocol/ssh_protocol.go 
b/internal/types/job/protocol/ssh_protocol.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/ssh_protocol.go
rename to internal/types/job/protocol/ssh_protocol.go
diff --git a/internal/collector/common/types/job/protocol/ssh_tunnel.go 
b/internal/types/job/protocol/ssh_tunnel.go
similarity index 100%
rename from internal/collector/common/types/job/protocol/ssh_tunnel.go
rename to internal/types/job/protocol/ssh_tunnel.go
diff --git 
a/internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go 
b/internal/types/job/protocol/zookeeper_sd_protocol.go
similarity index 100%
rename from 
internal/collector/common/types/job/protocol/zookeeper_sd_protocol.go
rename to internal/types/job/protocol/zookeeper_sd_protocol.go
diff --git a/internal/collector/common/types/job/timeout_types.go 
b/internal/types/job/timeout_types.go
similarity index 100%
rename from internal/collector/common/types/job/timeout_types.go
rename to internal/types/job/timeout_types.go
diff --git a/internal/collector/common/types/logger/logging_types.go 
b/internal/types/logger/logging_types.go
similarity index 100%
rename from internal/collector/common/types/logger/logging_types.go
rename to internal/types/logger/logging_types.go
diff --git a/internal/util/arrow/arrow_serializer.go 
b/internal/util/arrow/arrow_serializer.go
index 1915b43..0b78f5a 100644
--- a/internal/util/arrow/arrow_serializer.go
+++ b/internal/util/arrow/arrow_serializer.go
@@ -24,7 +24,7 @@ import (
        "encoding/json"
        "fmt"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/util/crypto/aes_util.go b/internal/util/crypto/aes_util.go
index 678760f..1567c56 100644
--- a/internal/util/crypto/aes_util.go
+++ b/internal/util/crypto/aes_util.go
@@ -26,7 +26,7 @@ import (
        "sync"
        "unicode"
 
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/util/logger/logger.go b/internal/util/logger/logger.go
index 67d4652..cc657eb 100644
--- a/internal/util/logger/logger.go
+++ b/internal/util/logger/logger.go
@@ -18,65 +18,64 @@
 package logger
 
 import (
-  "io"
-  "os"
-
-  "github.com/go-logr/logr"
-  "github.com/go-logr/zapr"
-  "go.uber.org/zap"
-  "go.uber.org/zap/zapcore"
-
-  
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "io"
+       "os"
+
+       "github.com/go-logr/logr"
+       "github.com/go-logr/zapr"
+       "go.uber.org/zap"
+       "go.uber.org/zap/zapcore"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
 )
 
 type Logger struct {
-  logr.Logger
-  out           io.Writer
-  logging       *logger.HertzBeatLogging
-  sugaredLogger *zap.SugaredLogger
+       logr.Logger
+       out           io.Writer
+       logging       *logger.HertzBeatLogging
+       sugaredLogger *zap.SugaredLogger
 }
 
 func NewLogger(w io.Writer, logging *logger.HertzBeatLogging) Logger {
 
-  logger := initZapLogger(w, logging, 
logging.Level[logger.LogComponentHertzbeatDefault])
+       logger := initZapLogger(w, logging, 
logging.Level[logger.LogComponentHertzbeatDefault])
 
-  return Logger{
-    Logger:        zapr.NewLogger(logger),
-    out:           w,
-    logging:       logging,
-    sugaredLogger: logger.Sugar(),
-  }
+       return Logger{
+               Logger:        zapr.NewLogger(logger),
+               out:           w,
+               logging:       logging,
+               sugaredLogger: logger.Sugar(),
+       }
 }
 
 func FileLogger(file, name string, level logger.LogLevel) Logger {
 
-  writer, err := os.OpenFile(file, os.O_WRONLY, 0o666)
-  if err != nil {
-    panic(err)
-  }
+       writer, err := os.OpenFile(file, os.O_WRONLY, 0o666)
+       if err != nil {
+               panic(err)
+       }
 
-  logging := logger.DefaultHertzbeatLogging()
-  logger := initZapLogger(writer, logging, level)
+       logging := logger.DefaultHertzbeatLogging()
+       logger := initZapLogger(writer, logging, level)
 
-  return Logger{
-    Logger:        zapr.NewLogger(logger).WithName(name),
-    logging:       logging,
-    out:           writer,
-    sugaredLogger: logger.Sugar(),
-  }
+       return Logger{
+               Logger:        zapr.NewLogger(logger).WithName(name),
+               logging:       logging,
+               out:           writer,
+               sugaredLogger: logger.Sugar(),
+       }
 }
 
 func DefaultLogger(out io.Writer, level logger.LogLevel) Logger {
 
-  logging := logger.DefaultHertzbeatLogging()
-  logger := initZapLogger(out, logging, level)
+       logging := logger.DefaultHertzbeatLogging()
+       logger := initZapLogger(out, logging, level)
 
-  return Logger{
-    Logger:        zapr.NewLogger(logger),
-    out:           out,
-    logging:       logging,
-    sugaredLogger: logger.Sugar(),
-  }
+       return Logger{
+               Logger:        zapr.NewLogger(logger),
+               out:           out,
+               logging:       logging,
+               sugaredLogger: logger.Sugar(),
+       }
 }
 
 // WithName returns a new Logger instance with the specified name element added
@@ -86,23 +85,23 @@ func DefaultLogger(out io.Writer, level logger.LogLevel) 
Logger {
 // more information).
 func (l Logger) WithName(name string) Logger {
 
-  logLevel := l.logging.Level[logger.HertzbeatLogComponent(name)]
-  logger := initZapLogger(l.out, l.logging, logLevel)
+       logLevel := l.logging.Level[logger.HertzbeatLogComponent(name)]
+       logger := initZapLogger(l.out, l.logging, logLevel)
 
-  return Logger{
-    Logger:        zapr.NewLogger(logger).WithName(name),
-    logging:       l.logging,
-    out:           l.out,
-    sugaredLogger: logger.Sugar().Named(name),
-  }
+       return Logger{
+               Logger:        zapr.NewLogger(logger).WithName(name),
+               logging:       l.logging,
+               out:           l.out,
+               sugaredLogger: logger.Sugar().Named(name),
+       }
 }
 
 // WithValues returns a new Logger instance with additional key/value pairs.
 // See Info for documentation on how key/value pairs work.
 func (l Logger) WithValues(keysAndValues ...interface{}) Logger {
 
-  l.Logger = l.Logger.WithValues(keysAndValues...)
-  return l
+       l.Logger = l.Logger.WithValues(keysAndValues...)
+       return l
 }
 
 // A Sugar wraps the base Logger functionality in a slower, but less
@@ -125,13 +124,13 @@ func (l Logger) WithValues(keysAndValues ...interface{}) 
Logger {
 //     Infoln(...any)         Println-style logger
 func (l Logger) Sugar() *zap.SugaredLogger {
 
-  return l.sugaredLogger
+       return l.sugaredLogger
 }
 
 func initZapLogger(w io.Writer, logging *logger.HertzBeatLogging, level 
logger.LogLevel) *zap.Logger {
 
-  parseLevel, _ := 
zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level)))
-  core := 
zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), 
zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel))
+       parseLevel, _ := 
zapcore.ParseLevel(string(logging.DefaultHertzBeatLoggingLevel(level)))
+       core := 
zapcore.NewCore(zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), 
zapcore.AddSync(w), zap.NewAtomicLevelAt(parseLevel))
 
-  return zap.New(core, zap.AddCaller())
+       return zap.New(core, zap.AddCaller())
 }
diff --git a/internal/util/logger/logger_test.go 
b/internal/util/logger/logger_test.go
index c3dd2cf..b3c7844 100644
--- a/internal/util/logger/logger_test.go
+++ b/internal/util/logger/logger_test.go
@@ -26,7 +26,7 @@ import (
        "github.com/stretchr/testify/require"
        "go.uber.org/zap"
        "go.uber.org/zap/zapcore"
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
 )
 
 func TestZapLogLevel(t *testing.T) {
diff --git a/internal/util/param/param_replacer.go 
b/internal/util/param/param_replacer.go
index 984e8cd..4ee423f 100644
--- a/internal/util/param/param_replacer.go
+++ b/internal/util/param/param_replacer.go
@@ -26,9 +26,9 @@ import (
        "strconv"
        "strings"
 
-       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job"
+       protocol2 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
@@ -44,7 +44,7 @@ func NewReplacer() *Replacer {
 
 // PreprocessJobPasswords decrypts passwords in job configmap once during job 
creation
 // This permanently replaces encrypted passwords with decrypted ones in the 
job configmap
-func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error {
+func (r *Replacer) PreprocessJobPasswords(job *job.Job) error {
        if job == nil || job.Configmap == nil {
                return nil
        }
@@ -71,7 +71,7 @@ func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) 
error {
 }
 
 // ReplaceJobParams replaces all parameter placeholders in job configuration
-func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) {
+func (r *Replacer) ReplaceJobParams(job *job.Job) (*job.Job, error) {
        if job == nil {
                return nil, fmt.Errorf("job is nil")
        }
@@ -97,7 +97,7 @@ func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) 
(*jobtypes.Job, error) {
 
 // createParamMapSimple creates a parameter map from configmap entries
 // Assumes passwords have already been decrypted by PreprocessJobPasswords
-func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap) 
map[string]string {
+func (r *Replacer) createParamMapSimple(configmap []job.Configmap) 
map[string]string {
        paramMap := make(map[string]string)
 
        for _, config := range configmap {
@@ -129,7 +129,7 @@ func (r *Replacer) createParamMapSimple(configmap 
[]jobtypes.Configmap) map[stri
 
 // ReplaceMetricsParams replaces parameters in metrics configuration
 // Exported method to be called by MetricsCollector
-func (r *Replacer) ReplaceMetricsParams(metrics *jobtypes.Metrics, paramMap 
map[string]string) error {
+func (r *Replacer) ReplaceMetricsParams(metrics *job.Metrics, paramMap 
map[string]string) error {
        // 1. JDBC
        if metrics.JDBC != nil {
                r.replaceJDBCParams(metrics.JDBC, paramMap)
@@ -159,7 +159,7 @@ func (r *Replacer) ReplaceMetricsParams(metrics 
*jobtypes.Metrics, paramMap map[
 }
 
 // replaceHTTPParams specific replacement logic for HTTPProtocol struct
-func (r *Replacer) replaceHTTPParams(http *protocol.HTTPProtocol, paramMap 
map[string]string) {
+func (r *Replacer) replaceHTTPParams(http *protocol2.HTTPProtocol, paramMap 
map[string]string) {
        http.URL = r.replaceParamPlaceholders(http.URL, paramMap)
        http.Method = r.replaceParamPlaceholders(http.Method, paramMap)
        http.Body = r.replaceParamPlaceholders(http.Body, paramMap)
@@ -207,7 +207,7 @@ func (r *Replacer) replaceHTTPParams(http 
*protocol.HTTPProtocol, paramMap map[s
 }
 
 // replaceRedisParams specific replacement logic for RedisProtocol struct
-func (r *Replacer) replaceRedisParams(redis *protocol.RedisProtocol, paramMap 
map[string]string) {
+func (r *Replacer) replaceRedisParams(redis *protocol2.RedisProtocol, paramMap 
map[string]string) {
        redis.Host = r.replaceParamPlaceholders(redis.Host, paramMap)
        redis.Port = r.replaceParamPlaceholders(redis.Port, paramMap)
        redis.Username = r.replaceParamPlaceholders(redis.Username, paramMap)
@@ -223,7 +223,7 @@ func (r *Replacer) replaceRedisParams(redis 
*protocol.RedisProtocol, paramMap ma
 }
 
 // replaceSSHParams specific replacement logic for SSHProtocol struct
-func (r *Replacer) replaceSSHParams(ssh *protocol.SSHProtocol, paramMap 
map[string]string) {
+func (r *Replacer) replaceSSHParams(ssh *protocol2.SSHProtocol, paramMap 
map[string]string) {
        ssh.Host = r.replaceParamPlaceholders(ssh.Host, paramMap)
        ssh.Port = r.replaceParamPlaceholders(ssh.Port, paramMap)
        ssh.Username = r.replaceParamPlaceholders(ssh.Username, paramMap)
@@ -244,7 +244,7 @@ func (r *Replacer) replaceSSHParams(ssh 
*protocol.SSHProtocol, paramMap map[stri
 }
 
 // replaceJDBCParams specific replacement logic for JDBCProtocol struct
-func (r *Replacer) replaceJDBCParams(jdbc *protocol.JDBCProtocol, paramMap 
map[string]string) {
+func (r *Replacer) replaceJDBCParams(jdbc *protocol2.JDBCProtocol, paramMap 
map[string]string) {
        jdbc.Host = r.replaceParamPlaceholders(jdbc.Host, paramMap)
        jdbc.Port = r.replaceParamPlaceholders(jdbc.Port, paramMap)
        jdbc.Platform = r.replaceParamPlaceholders(jdbc.Platform, paramMap)
@@ -265,7 +265,7 @@ func (r *Replacer) replaceJDBCParams(jdbc 
*protocol.JDBCProtocol, paramMap map[s
 }
 
 // replaceBasicMetricsParams replaces parameters in basic metrics fields
-func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics, 
paramMap map[string]string) error {
+func (r *Replacer) replaceBasicMetricsParams(metrics *job.Metrics, paramMap 
map[string]string) error {
        metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap)
        metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap)
        metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap)
@@ -321,14 +321,14 @@ func (r *Replacer) 
ExtractProtocolConfig(protocolInterface interface{}, targetSt
 }
 
 // ExtractJDBCConfig extracts and processes JDBC configuration
-func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) 
(*protocol.JDBCProtocol, error) {
+func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) 
(*protocol2.JDBCProtocol, error) {
        if jdbcInterface == nil {
                return nil, nil
        }
-       if jdbcConfig, ok := jdbcInterface.(*protocol.JDBCProtocol); ok {
+       if jdbcConfig, ok := jdbcInterface.(*protocol2.JDBCProtocol); ok {
                return jdbcConfig, nil
        }
-       var jdbcConfig protocol.JDBCProtocol
+       var jdbcConfig protocol2.JDBCProtocol
        if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err != 
nil {
                return nil, fmt.Errorf("failed to extract JDBC config: %w", err)
        }
@@ -336,14 +336,14 @@ func (r *Replacer) ExtractJDBCConfig(jdbcInterface 
interface{}) (*protocol.JDBCP
 }
 
 // ExtractHTTPConfig extracts and processes HTTP configuration
-func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) 
(*protocol.HTTPProtocol, error) {
+func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) 
(*protocol2.HTTPProtocol, error) {
        if httpInterface == nil {
                return nil, nil
        }
-       if httpConfig, ok := httpInterface.(*protocol.HTTPProtocol); ok {
+       if httpConfig, ok := httpInterface.(*protocol2.HTTPProtocol); ok {
                return httpConfig, nil
        }
-       var httpConfig protocol.HTTPProtocol
+       var httpConfig protocol2.HTTPProtocol
        if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err != 
nil {
                return nil, fmt.Errorf("failed to extract HTTP config: %w", err)
        }
@@ -351,14 +351,14 @@ func (r *Replacer) ExtractHTTPConfig(httpInterface 
interface{}) (*protocol.HTTPP
 }
 
 // ExtractSSHConfig extracts and processes SSH configuration
-func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) 
(*protocol.SSHProtocol, error) {
+func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) 
(*protocol2.SSHProtocol, error) {
        if sshInterface == nil {
                return nil, nil
        }
-       if sshConfig, ok := sshInterface.(*protocol.SSHProtocol); ok {
+       if sshConfig, ok := sshInterface.(*protocol2.SSHProtocol); ok {
                return sshConfig, nil
        }
-       var sshConfig protocol.SSHProtocol
+       var sshConfig protocol2.SSHProtocol
        if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil 
{
                return nil, fmt.Errorf("failed to extract SSH config: %w", err)
        }
diff --git a/internal/util/ssh/ssh_helper.go b/internal/util/ssh/ssh_helper.go
index 77b7d21..03bc7ce 100644
--- a/internal/util/ssh/ssh_helper.go
+++ b/internal/util/ssh/ssh_helper.go
@@ -28,8 +28,8 @@ import (
        "strings"
 
        "golang.org/x/crypto/ssh"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
diff --git a/internal/util/ssh/ssh_helper_test.go 
b/internal/util/ssh/ssh_helper_test.go
index 4cbc6fd..9bda9d8 100644
--- a/internal/util/ssh/ssh_helper_test.go
+++ b/internal/util/ssh/ssh_helper_test.go
@@ -31,9 +31,9 @@ import (
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "golang.org/x/crypto/ssh"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/job/protocol"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/types/logger"
 
-       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job/protocol"
-       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to