This is an automated email from the ASF dual-hosted git repository.

gaoxingcun pushed a commit to branch feature/go-collector-arrow-communication
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit d8c7becd272bcc855795e07f2d9c01d76e1a221f
Author: TJxiaobao <[email protected]>
AuthorDate: Tue Oct 14 22:51:47 2025 +0800

    feat: implement Apache Arrow data communication with Java Manager
    
    1、Add Apache Arrow serialization for metrics data compatibility
    2、Implement complete result handling and message routing
    3、Fix MonitorID matching between Go collector and Java Manager
    4、Add one-time and cyclic task result aggregation
    5、Integrate message router with job scheduler
---
 Dockerfile                                         |   4 +-
 internal/cmd/server.go                             |  19 +-
 .../collector/basic/database/jdbc_collector.go     | 105 +++---
 .../common/collect/dispatch/metrics_collector.go   |  10 +-
 .../common/collect/lazy_message_router.go          | 417 +++++++++++++++++++++
 .../collector/common/collect/result_handler.go     |   9 +-
 .../common/dispatcher/common_dispatcher.go         |  67 +++-
 .../common/dispatcher/hashed_wheel_timer.go        |  41 +-
 internal/collector/common/dispatcher/time_wheel.go | 321 +++++++++++++++-
 .../common/dispatcher/wheel_timer_task.go          |  57 ++-
 internal/collector/common/router/message_router.go | 269 +++++++++++++
 internal/collector/common/transport/transport.go   |  28 +-
 internal/collector/common/types/job/job_types.go   |   9 +-
 .../collector/common/types/job/metrics_types.go    | 111 ++++--
 internal/transport/netty_client.go                 |   8 +-
 internal/transport/processors.go                   | 265 +++++++++++--
 internal/util/arrow/arrow_serializer.go            | 151 ++++++++
 internal/util/crypto/aes_util.go                   | 194 ++++++++++
 internal/util/param/param_replacer.go              | 391 +++++++++++++++++++
 19 files changed, 2303 insertions(+), 173 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index 74c38a7..eb17107 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-FROM golang:1.23-alpine AS golang-builder
+FROM golang:1.25-alpine3.22 AS golang-builder
 
 ARG GOPROXY
 # ENV GOPROXY ${GOPROXY:-direct}
@@ -28,7 +28,7 @@ ENV BUILD_DIR /app
 
 COPY . ${BUILD_DIR}
 WORKDIR ${BUILD_DIR}
-RUN apk --no-cache add build-base git bash
+RUN apk --no-cache add build-base git bash golangci-lint
 
 RUN make init && \
     make fmt && \
diff --git a/internal/cmd/server.go b/internal/cmd/server.go
index 5ff4c45..1e98ceb 100644
--- a/internal/cmd/server.go
+++ b/internal/cmd/server.go
@@ -108,17 +108,20 @@ func server(ctx context.Context, logOut io.Writer) error {
 }
 
 func startRunners(ctx context.Context, cfg *clrserver.Server) error {
+       // Create job server first
+       jobRunner := jobserver.New(&jobserver.Config{
+               Server: *cfg,
+       })
+
+       // Create transport server and connect it to job server
+       transportRunner := transportserver.NewFromConfig(cfg.Config)
+       transportRunner.SetJobScheduler(jobRunner) // Connect transport to job 
scheduler
+
        runners := []struct {
                runner Runner[collectortypes.Info]
        }{
-               {
-                       jobserver.New(&jobserver.Config{
-                               Server: *cfg,
-                       }),
-               },
-               {
-                       transportserver.NewFromConfig(cfg.Config),
-               },
+               {jobRunner},
+               {transportRunner},
                // todo; add metrics
        }
 
diff --git a/internal/collector/basic/database/jdbc_collector.go 
b/internal/collector/basic/database/jdbc_collector.go
index 01fa7f2..dd74b2f 100644
--- a/internal/collector/basic/database/jdbc_collector.go
+++ b/internal/collector/basic/database/jdbc_collector.go
@@ -30,6 +30,7 @@ import (
        _ "github.com/microsoft/go-mssqldb"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
 
 const (
@@ -67,6 +68,13 @@ func NewJDBCCollector(logger logger.Logger) *JDBCCollector {
        }
 }
 
+// extractJDBCConfig extracts JDBC configuration from interface{} type
+// This function uses the parameter replacer for consistent configuration 
extraction
+func extractJDBCConfig(jdbcInterface interface{}) (*jobtypes.JDBCProtocol, 
error) {
+       replacer := param.NewReplacer()
+       return replacer.ExtractJDBCConfig(jdbcInterface)
+}
+
 // PreCheck validates the JDBC metrics configuration
 func (jc *JDBCCollector) PreCheck(metrics *jobtypes.Metrics) error {
        if metrics == nil {
@@ -77,44 +85,51 @@ func (jc *JDBCCollector) PreCheck(metrics 
*jobtypes.Metrics) error {
                return fmt.Errorf("JDBC protocol configuration is required")
        }
 
-       jdbc := metrics.JDBC
+       // Extract JDBC configuration
+       jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
+       if err != nil {
+               return fmt.Errorf("invalid JDBC configuration: %w", err)
+       }
+       if jdbcConfig == nil {
+               return fmt.Errorf("JDBC configuration is required")
+       }
 
        // Validate required fields when URL is not provided
-       if jdbc.URL == "" {
-               if jdbc.Host == "" {
+       if jdbcConfig.URL == "" {
+               if jdbcConfig.Host == "" {
                        return fmt.Errorf("host is required when URL is not 
provided")
                }
-               if jdbc.Port == "" {
+               if jdbcConfig.Port == "" {
                        return fmt.Errorf("port is required when URL is not 
provided")
                }
-               if jdbc.Platform == "" {
+               if jdbcConfig.Platform == "" {
                        return fmt.Errorf("platform is required when URL is not 
provided")
                }
        }
 
        // Validate platform
-       if jdbc.Platform != "" {
-               switch jdbc.Platform {
+       if jdbcConfig.Platform != "" {
+               switch jdbcConfig.Platform {
                case PlatformMySQL, PlatformMariaDB, PlatformPostgreSQL, 
PlatformSQLServer, PlatformOracle:
                        // Valid platforms
                default:
-                       return fmt.Errorf("unsupported database platform: %s", 
jdbc.Platform)
+                       return fmt.Errorf("unsupported database platform: %s", 
jdbcConfig.Platform)
                }
        }
 
        // Validate query type
-       if jdbc.QueryType != "" {
-               switch jdbc.QueryType {
+       if jdbcConfig.QueryType != "" {
+               switch jdbcConfig.QueryType {
                case QueryTypeOneRow, QueryTypeMultiRow, QueryTypeColumns, 
QueryTypeRunScript:
                        // Valid query types
                default:
-                       return fmt.Errorf("unsupported query type: %s", 
jdbc.QueryType)
+                       return fmt.Errorf("unsupported query type: %s", 
jdbcConfig.QueryType)
                }
        }
 
        // Validate SQL for most query types
-       if jdbc.QueryType != QueryTypeRunScript && jdbc.SQL == "" {
-               return fmt.Errorf("SQL is required for query type: %s", 
jdbc.QueryType)
+       if jdbcConfig.QueryType != QueryTypeRunScript && jdbcConfig.SQL == "" {
+               return fmt.Errorf("SQL is required for query type: %s", 
jdbcConfig.QueryType)
        }
 
        return nil
@@ -123,27 +138,35 @@ func (jc *JDBCCollector) PreCheck(metrics 
*jobtypes.Metrics) error {
 // Collect performs JDBC metrics collection
 func (jc *JDBCCollector) Collect(metrics *jobtypes.Metrics) 
*jobtypes.CollectRepMetricsData {
        startTime := time.Now()
-       jdbc := metrics.JDBC
 
-       jc.logger.Info("starting JDBC collection",
-               "host", jdbc.Host,
-               "port", jdbc.Port,
-               "platform", jdbc.Platform,
-               "database", jdbc.Database,
-               "queryType", jdbc.QueryType)
+       // Extract JDBC configuration
+       jdbcConfig, err := extractJDBCConfig(metrics.JDBC)
+       if err != nil {
+               jc.logger.Error(err, "failed to extract JDBC config")
+               return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("JDBC config error: %v", err))
+       }
+       if jdbcConfig == nil {
+               return jc.createFailResponse(metrics, CodeFail, "JDBC 
configuration is required")
+       }
+
+       // Debug level only for collection start
+       jc.logger.V(1).Info("starting JDBC collection",
+               "host", jdbcConfig.Host,
+               "platform", jdbcConfig.Platform,
+               "queryType", jdbcConfig.QueryType)
 
        // Get timeout
-       timeout := jc.getTimeout(jdbc.Timeout)
+       timeout := jc.getTimeout(jdbcConfig.Timeout)
 
        // Get database URL
-       databaseURL, err := jc.constructDatabaseURL(jdbc)
+       databaseURL, err := jc.constructDatabaseURL(jdbcConfig)
        if err != nil {
                jc.logger.Error(err, "failed to construct database URL")
                return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Database URL error: %v", err))
        }
 
        // Create database connection
-       db, err := jc.getConnection(databaseURL, jdbc.Username, jdbc.Password, 
timeout)
+       db, err := jc.getConnection(databaseURL, jdbcConfig.Username, 
jdbcConfig.Password, timeout)
        if err != nil {
                jc.logger.Error(err, "failed to connect to database")
                return jc.createFailResponse(metrics, CodeUnConnectable, 
fmt.Sprintf("Connection error: %v", err))
@@ -153,26 +176,27 @@ func (jc *JDBCCollector) Collect(metrics 
*jobtypes.Metrics) *jobtypes.CollectRep
        // Execute query based on type with context
        response := jc.createSuccessResponse(metrics)
 
-       switch jdbc.QueryType {
+       switch jdbcConfig.QueryType {
        case QueryTypeOneRow:
-               err = jc.queryOneRow(db, jdbc.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryOneRow(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
        case QueryTypeMultiRow:
-               err = jc.queryMultiRow(db, jdbc.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryMultiRow(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
        case QueryTypeColumns:
-               err = jc.queryColumns(db, jdbc.SQL, metrics.Aliasfields, 
response)
+               err = jc.queryColumns(db, jdbcConfig.SQL, metrics.Aliasfields, 
response)
        case QueryTypeRunScript:
-               err = jc.runScript(db, jdbc.SQL, response)
+               err = jc.runScript(db, jdbcConfig.SQL, response)
        default:
-               err = fmt.Errorf("unsupported query type: %s", jdbc.QueryType)
+               err = fmt.Errorf("unsupported query type: %s", 
jdbcConfig.QueryType)
        }
 
        if err != nil {
-               jc.logger.Error(err, "query execution failed", "queryType", 
jdbc.QueryType)
+               jc.logger.Error(err, "query execution failed", "queryType", 
jdbcConfig.QueryType)
                return jc.createFailResponse(metrics, CodeFail, 
fmt.Sprintf("Query error: %v", err))
        }
 
        duration := time.Since(startTime)
-       jc.logger.Info("JDBC collection completed",
+       // Debug level only for successful completion
+       jc.logger.V(1).Info("JDBC collection completed",
                "duration", duration,
                "rowCount", len(response.Values))
 
@@ -195,9 +219,8 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) 
time.Duration {
                return duration
        }
 
-       // If it's a pure number, treat it as seconds (more reasonable for 
database connections)
        if timeout, err := strconv.Atoi(timeoutStr); err == nil {
-               return time.Duration(timeout) * time.Second
+               return time.Duration(timeout) * time.Millisecond
        }
 
        return 30 * time.Second // fallback to default
@@ -205,10 +228,6 @@ func (jc *JDBCCollector) getTimeout(timeoutStr string) 
time.Duration {
 
 // constructDatabaseURL constructs the database connection URL
 func (jc *JDBCCollector) constructDatabaseURL(jdbc *jobtypes.JDBCProtocol) 
(string, error) {
-       // If URL is provided directly, use it
-       if jdbc.URL != "" {
-               return jdbc.URL, nil
-       }
 
        // Construct URL based on platform
        host := jdbc.Host
@@ -217,7 +236,8 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc 
*jobtypes.JDBCProtocol) (stri
 
        switch jdbc.Platform {
        case PlatformMySQL, PlatformMariaDB:
-               return 
fmt.Sprintf("mysql://%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
+               // MySQL DSN format: 
[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
+               return 
fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?parseTime=true&charset=utf8mb4",
                        jdbc.Username, jdbc.Password, host, port, database), nil
        case PlatformPostgreSQL:
                return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
@@ -234,14 +254,13 @@ func (jc *JDBCCollector) constructDatabaseURL(jdbc 
*jobtypes.JDBCProtocol) (stri
 func (jc *JDBCCollector) getConnection(databaseURL, username, password string, 
timeout time.Duration) (*sql.DB, error) {
        // Extract driver name from URL
        var driverName string
-       if strings.HasPrefix(databaseURL, "mysql://") {
-               driverName = "mysql"
-               // Remove mysql:// prefix for the driver
-               databaseURL = strings.TrimPrefix(databaseURL, "mysql://")
-       } else if strings.HasPrefix(databaseURL, "postgres://") {
+       if strings.HasPrefix(databaseURL, "postgres://") {
                driverName = "postgres"
        } else if strings.HasPrefix(databaseURL, "sqlserver://") {
                driverName = "sqlserver"
+       } else if strings.Contains(databaseURL, "@tcp(") {
+               // MySQL DSN format (no protocol prefix)
+               driverName = "mysql"
        } else {
                return nil, fmt.Errorf("unsupported database URL format: %s", 
databaseURL)
        }
diff --git a/internal/collector/common/collect/dispatch/metrics_collector.go 
b/internal/collector/common/collect/dispatch/metrics_collector.go
index 85e7580..bc3ea2e 100644
--- a/internal/collector/common/collect/dispatch/metrics_collector.go
+++ b/internal/collector/common/collect/dispatch/metrics_collector.go
@@ -53,8 +53,8 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
        go func() {
                defer close(resultChan)
 
-               mc.logger.Info("starting metrics collection",
-                       "jobID", job.ID,
+               // Debug level only for collection start
+               mc.logger.V(1).Info("starting metrics collection",
                        "metricsName", metrics.Name,
                        "protocol", metrics.Protocol)
 
@@ -72,7 +72,7 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
                        return
                }
 
-               // Perform the actual collection
+               // Perform the actual collection with metrics (parameters 
should already be replaced by CommonDispatcher)
                result := collector.Collect(metrics)
 
                // Enrich result with job information
@@ -101,9 +101,9 @@ func (mc *MetricsCollector) CollectMetrics(metrics 
*jobtypes.Metrics, job *jobty
 
                duration := time.Since(startTime)
 
+               // Only log failures at INFO level, success at debug level
                if result != nil && result.Code == http.StatusOK {
-                       mc.logger.Info("metrics collection completed 
successfully",
-                               "jobID", job.ID,
+                       mc.logger.V(1).Info("metrics collection completed 
successfully",
                                "metricsName", metrics.Name,
                                "protocol", metrics.Protocol,
                                "duration", duration,
diff --git a/internal/collector/common/collect/lazy_message_router.go 
b/internal/collector/common/collect/lazy_message_router.go
new file mode 100644
index 0000000..b12137d
--- /dev/null
+++ b/internal/collector/common/collect/lazy_message_router.go
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package collect
+
+import (
+       "bytes"
+       "encoding/binary"
+       "fmt"
+
+       "github.com/apache/arrow/go/v13/arrow"
+       "github.com/apache/arrow/go/v13/arrow/array"
+       "github.com/apache/arrow/go/v13/arrow/ipc"
+       "github.com/apache/arrow/go/v13/arrow/memory"
+
+       pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// TransportRunner interface for getting transport client
+type TransportRunner interface {
+       GetClient() transport.TransportClient
+       IsConnected() bool
+}
+
+// LazyMessageRouter is a message router that lazily obtains transport client
+// Solves the startup order dependency between transport client and job runner
+type LazyMessageRouter struct {
+       transportRunner TransportRunner
+       logger          logger.Logger
+       identity        string
+}
+
+// NewLazyMessageRouter creates a new lazy message router
+func NewLazyMessageRouter(transportRunner TransportRunner, logger 
logger.Logger, identity string) MessageRouter {
+       if identity == "" {
+               identity = "collector-go" // Default identity
+       }
+
+       return &LazyMessageRouter{
+               transportRunner: transportRunner,
+               logger:          logger.WithName("lazy-message-router"),
+               identity:        identity,
+       }
+}
+
+// SendResult implements MessageRouter interface
+func (l *LazyMessageRouter) SendResult(data *jobtypes.CollectRepMetricsData, 
job *jobtypes.Job) error {
+       // Get transport client
+       client := l.transportRunner.GetClient()
+       if client == nil || !client.IsStarted() {
+               l.logger.V(1).Info("transport client not ready, dropping 
result",
+                       "jobID", job.ID,
+                       "metricsName", data.Metrics,
+                       "isCyclic", job.IsCyclic)
+               return fmt.Errorf("transport client not ready")
+       }
+
+       // Send result directly
+       return l.sendResultDirectly(data, job, client)
+}
+
+// sendResultDirectly sends result directly to manager
+func (l *LazyMessageRouter) sendResultDirectly(data 
*jobtypes.CollectRepMetricsData, job *jobtypes.Job, client 
transport.TransportClient) error {
+       // Determine message type
+       var msgType pb.MessageType
+       if job.IsCyclic {
+               msgType = pb.MessageType_RESPONSE_CYCLIC_TASK_DATA
+       } else {
+               msgType = pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA
+       }
+
+       // Serialize data to Arrow format
+       dataBytes, err := 
l.serializeToArrow([]*jobtypes.CollectRepMetricsData{data})
+       if err != nil {
+               l.logger.Error(err, "failed to create arrow format",
+                       "jobID", job.ID,
+                       "metricsName", data.Metrics)
+               return fmt.Errorf("failed to create arrow format: %w", err)
+       }
+
+       // Create message - collector sends data to Manager as RESPONSE 
(response to task)
+       msg := &pb.Message{
+               Type:      msgType,
+               Direction: pb.Direction_RESPONSE,
+               Identity:  l.identity,
+               Msg:       dataBytes,
+       }
+
+       // Send message
+       if err := client.SendMsg(msg); err != nil {
+               l.logger.Error(err, "failed to send metrics data",
+                       "jobID", job.ID,
+                       "metricsName", data.Metrics,
+                       "messageType", msgType)
+               return fmt.Errorf("failed to send metrics data: %w", err)
+       }
+
+       l.logger.Info("successfully sent metrics data",
+               "jobID", job.ID,
+               "metricsName", data.Metrics,
+               "isCyclic", job.IsCyclic,
+               "messageType", msgType,
+               "dataSize", len(dataBytes),
+               "direction", msg.Direction,
+               "identity", msg.Identity)
+
+       // Add detailed debugging information
+       l.logger.Info("message details for debugging",
+               "msgType", int(msgType),
+               "direction", int(msg.Direction),
+               "identity", msg.Identity,
+               "dataLength", len(dataBytes))
+
+       return nil
+}
+
+// serializeToArrow serializes data using Apache Arrow format, compatible with 
Java Manager
+func (l *LazyMessageRouter) serializeToArrow(dataList 
[]*jobtypes.CollectRepMetricsData) ([]byte, error) {
+       var mainBuf bytes.Buffer
+
+       // Write root count (format expected by Java Manager)
+       rootCount := int32(len(dataList))
+       if err := binary.Write(&mainBuf, binary.BigEndian, rootCount); err != 
nil {
+               return nil, fmt.Errorf("failed to write root count: %w", err)
+       }
+
+       mem := memory.NewGoAllocator()
+
+       // Create separate Arrow stream for each data item
+       for i, data := range dataList {
+               recordBatch, err := l.createArrowRecordBatch(mem, data)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to create record batch 
for data %d: %w", i, err)
+               }
+
+               // Create Arrow stream
+               var streamBuf bytes.Buffer
+               writer := ipc.NewWriter(&streamBuf, 
ipc.WithSchema(recordBatch.Schema()))
+               if err := writer.Write(recordBatch); err != nil {
+                       recordBatch.Release()
+                       writer.Close()
+                       return nil, fmt.Errorf("failed to write record batch 
%d: %w", i, err)
+               }
+               if err := writer.Close(); err != nil {
+                       recordBatch.Release()
+                       return nil, fmt.Errorf("failed to close writer for 
batch %d: %w", i, err)
+               }
+               recordBatch.Release()
+
+               // Write stream data to main buffer
+               streamData := streamBuf.Bytes()
+               if _, err := mainBuf.Write(streamData); err != nil {
+                       return nil, fmt.Errorf("failed to write stream data for 
batch %d: %w", i, err)
+               }
+       }
+
+       return mainBuf.Bytes(), nil
+}
+
+// createArrowRecordBatch creates Arrow RecordBatch for single MetricsData, 
compatible with Java Manager
+func (l *LazyMessageRouter) createArrowRecordBatch(mem memory.Allocator, data 
*jobtypes.CollectRepMetricsData) (arrow.Record, error) {
+       // Create metadata for fields (all fields use the same default metadata)
+       emptyMetadata := arrow.MetadataFrom(map[string]string{
+               "type":  "1",
+               "label": "false",
+               "unit":  "none",
+       })
+
+       // Define metadata fields (fixed fields)
+       metadataFields := []arrow.Field{
+               {Name: "app", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
+               {Name: "metrics", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
+               {Name: "id", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
+               {Name: "monitorId", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
+               {Name: "tenantId", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
+               {Name: "priority", Type: arrow.BinaryTypes.String, Nullable: 
true, Metadata: emptyMetadata},
+               {Name: "time", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
+               {Name: "code", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
+               {Name: "msg", Type: arrow.BinaryTypes.String, Nullable: true, 
Metadata: emptyMetadata},
+       }
+
+       // Add dynamic fields (based on collected field definitions)
+       dynamicFields := make([]arrow.Field, 0, len(data.Fields))
+       for _, field := range data.Fields {
+               // Ensure unit is not empty
+               unitValue := field.Unit
+               if unitValue == "" {
+                       unitValue = "none"
+               }
+
+               // Create field metadata
+               typeValue := fmt.Sprintf("%d", field.Type)
+               labelValue := fmt.Sprintf("%t", field.Label)
+
+               fieldMetadata := arrow.MetadataFrom(map[string]string{
+                       "type":  typeValue,
+                       "label": labelValue,
+                       "unit":  unitValue,
+               })
+
+               dynamicFields = append(dynamicFields, arrow.Field{
+                       Name:     field.Field,
+                       Type:     arrow.BinaryTypes.String,
+                       Nullable: true,
+                       Metadata: fieldMetadata,
+               })
+       }
+
+       // Merge all fields
+       allFields := make([]arrow.Field, 0, 
len(metadataFields)+len(dynamicFields))
+       allFields = append(allFields, metadataFields...)
+       allFields = append(allFields, dynamicFields...)
+
+       // Create schema-level metadata
+       schemaMetadata := arrow.MetadataFrom(map[string]string{
+               "id":          fmt.Sprintf("%d", data.ID),
+               "tenantId":    fmt.Sprintf("%d", data.TenantID),
+               "app":         data.App,
+               "metrics":     data.Metrics,
+               "priority":    fmt.Sprintf("%d", data.Priority),
+               "time":        fmt.Sprintf("%d", data.Time),
+               "labels":      "",
+               "annotations": "",
+       })
+
+       schema := arrow.NewSchema(allFields, &schemaMetadata)
+
+       // Create builders (all fields are String type)
+       builders := make([]array.Builder, len(allFields))
+       for i, field := range allFields {
+               builders[i] = array.NewBuilder(mem, field.Type)
+       }
+       defer func() {
+               for _, builder := range builders {
+                       builder.Release()
+               }
+       }()
+
+       // Determine row count
+       rowCount := len(data.Values)
+       if rowCount == 0 {
+               rowCount = 1 // At least one row of metadata
+       }
+
+       // Fill data
+       for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
+               // Fill metadata fields
+               builders[0].(*array.StringBuilder).Append(data.App)
+               builders[1].(*array.StringBuilder).Append(data.Metrics)
+               builders[2].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.ID))
+               builders[3].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.MonitorID))
+               builders[4].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.TenantID))
+               builders[5].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.Priority))
+               builders[6].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.Time))
+               builders[7].(*array.StringBuilder).Append(fmt.Sprintf("%d", 
data.Code))
+               builders[8].(*array.StringBuilder).Append(data.Msg)
+
+               // Fill dynamic field data
+               if rowIdx < len(data.Values) {
+                       valueRow := data.Values[rowIdx]
+                       for i, _ := range data.Fields {
+                               builderIdx := len(metadataFields) + i
+                               var value string
+                               if i < len(valueRow.Columns) {
+                                       value = valueRow.Columns[i]
+                               }
+                               
builders[builderIdx].(*array.StringBuilder).Append(value)
+                       }
+               } else {
+                       // Fill empty values
+                       for i := range data.Fields {
+                               builderIdx := len(metadataFields) + i
+                               
builders[builderIdx].(*array.StringBuilder).Append("")
+                       }
+               }
+       }
+
+       // Build arrays
+       arrays := make([]arrow.Array, len(builders))
+       for i, builder := range builders {
+               arrays[i] = builder.NewArray()
+               defer arrays[i].Release()
+       }
+
+       // Create Record
+       record := array.NewRecord(schema, arrays, int64(rowCount))
+       return record, nil
+}
+
+// createUnifiedArrowRecordBatch creates a unified RecordBatch containing all 
MetricsData
+func (l *LazyMessageRouter) createUnifiedArrowRecordBatch(mem 
memory.Allocator, dataList []*jobtypes.CollectRepMetricsData) (arrow.Record, 
error) {
+       if len(dataList) == 0 {
+               return nil, fmt.Errorf("empty data list")
+       }
+
+       // Use the first data item to define schema
+       firstData := dataList[0]
+
+       // Define basic fields - simplified schema, only core fields
+       fields := []arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64},
+               {Name: "monitorId", Type: arrow.PrimitiveTypes.Int64},
+               {Name: "app", Type: arrow.BinaryTypes.String},
+               {Name: "metrics", Type: arrow.BinaryTypes.String},
+               {Name: "time", Type: arrow.PrimitiveTypes.Int64},
+               {Name: "code", Type: arrow.PrimitiveTypes.Int32},
+               {Name: "msg", Type: arrow.BinaryTypes.String},
+       }
+
+       // Add value fields - based on first data item's Fields
+       if len(firstData.Fields) > 0 {
+               for _, field := range firstData.Fields {
+                       fields = append(fields, arrow.Field{
+                               Name: field.Field,
+                               Type: arrow.BinaryTypes.String,
+                       })
+               }
+       }
+
+       schema := arrow.NewSchema(fields, nil)
+
+       // Create builders
+       builders := make([]array.Builder, len(fields))
+       for i, field := range fields {
+               switch field.Type {
+               case arrow.PrimitiveTypes.Int64:
+                       builders[i] = array.NewInt64Builder(mem)
+               case arrow.PrimitiveTypes.Int32:
+                       builders[i] = array.NewInt32Builder(mem)
+               default:
+                       builders[i] = array.NewStringBuilder(mem)
+               }
+       }
+
+       // Calculate total rows
+       totalRows := 0
+       for _, data := range dataList {
+               if len(data.Values) > 0 {
+                       totalRows += len(data.Values)
+               } else {
+                       totalRows += 1 // At least one row of metadata
+               }
+       }
+
+       // Fill data
+       for _, data := range dataList {
+               rowCount := 1
+               if len(data.Values) > 0 {
+                       rowCount = len(data.Values)
+               }
+
+               for row := 0; row < rowCount; row++ {
+                       // Basic fields
+                       builders[0].(*array.Int64Builder).Append(data.ID)
+                       builders[1].(*array.Int64Builder).Append(data.MonitorID)
+                       builders[2].(*array.StringBuilder).Append(data.App)
+                       builders[3].(*array.StringBuilder).Append(data.Metrics)
+                       builders[4].(*array.Int64Builder).Append(data.Time)
+                       
builders[5].(*array.Int32Builder).Append(int32(data.Code))
+                       builders[6].(*array.StringBuilder).Append(data.Msg)
+
+                       // Value fields
+                       if len(data.Values) > row && len(data.Fields) > 0 {
+                               valueRow := data.Values[row]
+                               for i := range data.Fields {
+                                       builderIndex := 7 + i // Index after 
basic fields
+                                       if builderIndex < len(builders) {
+                                               if i < len(valueRow.Columns) {
+                                                       
builders[builderIndex].(*array.StringBuilder).Append(valueRow.Columns[i])
+                                               } else {
+                                                       
builders[builderIndex].(*array.StringBuilder).Append("")
+                                               }
+                                       }
+                               }
+                       } else {
+                               // Fill empty values
+                               for i := 0; i < len(firstData.Fields); i++ {
+                                       builderIndex := 7 + i
+                                       if builderIndex < len(builders) {
+                                               
builders[builderIndex].(*array.StringBuilder).Append("")
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // Build arrays
+       arrays := make([]arrow.Array, len(builders))
+       for i, builder := range builders {
+               arrays[i] = builder.NewArray()
+               defer arrays[i].Release()
+               builder.Release()
+       }
+
+       // Create RecordBatch
+       record := array.NewRecord(schema, arrays, int64(totalRows))
+       return record, nil
+}
diff --git a/internal/collector/common/collect/result_handler.go 
b/internal/collector/common/collect/result_handler.go
index 7f5a304..22ceb1f 100644
--- a/internal/collector/common/collect/result_handler.go
+++ b/internal/collector/common/collect/result_handler.go
@@ -52,9 +52,8 @@ func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsD
                return fmt.Errorf("collect data is nil")
        }
 
-       rh.logger.Info("handling collect data",
-               "jobID", job.ID,
-               "monitorID", job.MonitorID,
+       // Debug level only for data handling
+       rh.logger.V(1).Info("handling collect data",
                "metricsName", data.Metrics,
                "code", data.Code,
                "valuesCount", len(data.Values))
@@ -67,9 +66,9 @@ func (rh *ResultHandlerImpl) HandleCollectData(data 
*jobtypes.CollectRepMetricsD
        // 4. Triggering alerts based on thresholds
        // 5. Updating monitoring status
 
+       // Only log failures at INFO level, success at debug level
        if data.Code == http.StatusOK {
-               rh.logger.Info("successfully processed collect data",
-                       "jobID", job.ID,
+               rh.logger.V(1).Info("successfully processed collect data",
                        "metricsName", data.Metrics)
        } else {
                rh.logger.Info("received failed collect data",
diff --git a/internal/collector/common/dispatcher/common_dispatcher.go 
b/internal/collector/common/dispatcher/common_dispatcher.go
index 137304a..8d37be5 100644
--- a/internal/collector/common/dispatcher/common_dispatcher.go
+++ b/internal/collector/common/dispatcher/common_dispatcher.go
@@ -28,6 +28,7 @@ import (
        
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/collect"
        jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
 
 // CommonDispatcherImpl is responsible for breaking down jobs into individual 
metrics collection tasks
@@ -59,9 +60,9 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                return fmt.Errorf("job cannot be nil")
        }
 
-       cd.logger.Info("dispatching metrics task",
+       // Dispatching metrics task - only log at debug level
+       cd.logger.V(1).Info("dispatching metrics task",
                "jobID", job.ID,
-               "monitorID", job.MonitorID,
                "app", job.App,
                "metricsCount", len(job.Metrics))
 
@@ -74,6 +75,16 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                return nil
        }
 
+       // Replace parameters in job configuration ONCE before concurrent 
collection
+       // This avoids concurrent map access issues in MetricsCollector
+       paramReplacer := param.NewReplacer()
+       processedJob, err := paramReplacer.ReplaceJobParams(job)
+       if err != nil {
+               cd.logger.Error(err, "failed to replace job parameters",
+                       "jobID", job.ID)
+               return fmt.Errorf("parameter replacement failed: %w", err)
+       }
+
        // Create collection context with timeout
        collectCtx, collectCancel := context.WithTimeout(ctx, 
cd.getCollectionTimeout(job))
        defer collectCancel()
@@ -82,13 +93,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
        resultChannels := make([]chan *jobtypes.CollectRepMetricsData, 
len(metricsToCollect))
 
        for i, metrics := range metricsToCollect {
-               cd.logger.Info("starting metrics collection",
-                       "jobID", job.ID,
+               // Starting metrics collection - debug level only
+               cd.logger.V(1).Info("starting metrics collection",
                        "metricsName", metrics.Name,
                        "protocol", metrics.Protocol)
 
-               // Start metrics collection in goroutine
-               resultChannels[i] = cd.metricsCollector.CollectMetrics(metrics, 
job, timeout)
+               // Find the processed metrics in the processed job
+               var processedMetrics *jobtypes.Metrics
+               for j := range processedJob.Metrics {
+                       if processedJob.Metrics[j].Name == metrics.Name {
+                               processedMetrics = &processedJob.Metrics[j]
+                               break
+                       }
+               }
+               if processedMetrics == nil {
+                       cd.logger.Error(nil, "processed metrics not found", 
"metricsName", metrics.Name)
+                       continue
+               }
+
+               // Start metrics collection in goroutine with processed data
+               resultChannels[i] = 
cd.metricsCollector.CollectMetrics(processedMetrics, processedJob, timeout)
        }
 
        // Collect results from all metrics collection tasks
@@ -100,8 +124,8 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                case result := <-resultChan:
                        if result != nil {
                                results = append(results, result)
-                               cd.logger.Info("received metrics result",
-                                       "jobID", job.ID,
+                               // Metrics result received - debug level only
+                               cd.logger.V(1).Info("received metrics result",
                                        "metricsName", metricsToCollect[i].Name,
                                        "code", result.Code)
                        }
@@ -123,18 +147,26 @@ func (cd *CommonDispatcherImpl) DispatchMetricsTask(ctx 
context.Context, job *jo
                return err
        }
 
-       cd.logger.Info("successfully dispatched metrics task",
-               "jobID", job.ID,
-               "resultsCount", len(results),
-               "errorsCount", len(errors),
-               "duration", duration)
+       // Only log summary for successful tasks if there were errors
+       if len(errors) > 0 {
+               cd.logger.Info("metrics task completed with errors",
+                       "jobID", job.ID,
+                       "errorsCount", len(errors),
+                       "duration", duration)
+       } else {
+               cd.logger.V(1).Info("metrics task completed successfully",
+                       "jobID", job.ID,
+                       "resultsCount", len(results),
+                       "duration", duration)
+       }
 
        return nil
 }
 
 // handleResults processes the collection results and decides on next actions
 func (cd *CommonDispatcherImpl) handleResults(results 
[]*jobtypes.CollectRepMetricsData, job *jobtypes.Job, errors []error) error {
-       cd.logger.Info("handling collection results",
+       // Debug level only for result handling
+       cd.logger.V(1).Info("handling collection results",
                "jobID", job.ID,
                "resultsCount", len(results),
                "errorsCount", len(errors))
@@ -163,7 +195,8 @@ func (cd *CommonDispatcherImpl) handleResults(results 
[]*jobtypes.CollectRepMetr
 
 // evaluateNextActions decides what to do next based on collection results
 func (cd *CommonDispatcherImpl) evaluateNextActions(job *jobtypes.Job, results 
[]*jobtypes.CollectRepMetricsData, errors []error) {
-       cd.logger.Info("evaluating next actions",
+       // Debug level only for next actions evaluation
+       cd.logger.V(1).Info("evaluating next actions",
                "jobID", job.ID,
                "resultsCount", len(results),
                "errorsCount", len(errors))
@@ -172,13 +205,13 @@ func (cd *CommonDispatcherImpl) evaluateNextActions(job 
*jobtypes.Job, results [
        hasNextLevel := cd.hasNextLevelMetrics(job, results)
 
        if hasNextLevel {
-               cd.logger.Info("job has next level metrics to collect", 
"jobID", job.ID)
+               cd.logger.V(1).Info("job has next level metrics to collect", 
"jobID", job.ID)
                // TODO: Schedule next level metrics collection
        }
 
        // For cyclic jobs, the rescheduling is handled by WheelTimerTask
        if job.IsCyclic {
-               cd.logger.Info("cyclic job will be rescheduled by timer", 
"jobID", job.ID)
+               cd.logger.V(1).Info("cyclic job will be rescheduled by timer", 
"jobID", job.ID)
        }
 
        // TODO: Send results to data queue for further processing
diff --git a/internal/collector/common/dispatcher/hashed_wheel_timer.go 
b/internal/collector/common/dispatcher/hashed_wheel_timer.go
index 3ebe931..9100205 100644
--- a/internal/collector/common/dispatcher/hashed_wheel_timer.go
+++ b/internal/collector/common/dispatcher/hashed_wheel_timer.go
@@ -100,7 +100,8 @@ func (hwt *hashedWheelTimer) NewTimeout(task 
jobtypes.TimerTask, delay time.Dura
        bucket.timeouts = append(bucket.timeouts, timeout)
        bucket.mu.Unlock()
 
-       hwt.logger.Info("created timeout",
+       // Log timeout creation only for debugging
+       hwt.logger.V(1).Info("created timeout",
                "delay", delay,
                "bucketIndex", bucketIndex,
                "targetTick", targetTick)
@@ -114,9 +115,7 @@ func (hwt *hashedWheelTimer) Start(ctx context.Context) 
error {
                return nil // already started
        }
 
-       hwt.logger.Info("starting hashed wheel timer",
-               "wheelSize", hwt.wheelSize,
-               "tickDuration", hwt.tickDuration)
+       hwt.logger.Info("starting hashed wheel timer")
 
        hwt.startTime = time.Now()
        hwt.ticker = time.NewTicker(hwt.tickDuration)
@@ -138,8 +137,6 @@ func (hwt *hashedWheelTimer) Stop() error {
                return nil // already stopped
        }
 
-       hwt.logger.Info("stopping hashed wheel timer")
-
        hwt.cancel()
 
        if hwt.ticker != nil {
@@ -174,6 +171,14 @@ func (hwt *hashedWheelTimer) tick() {
        bucket := hwt.wheel[bucketIndex]
        bucket.mu.Lock()
 
+       // Only log tick processing if there are timeouts to process
+       if len(bucket.timeouts) > 0 {
+               hwt.logger.V(1).Info("processing tick",
+                       "currentTick", currentTick,
+                       "bucketIndex", bucketIndex,
+                       "timeoutsCount", len(bucket.timeouts))
+       }
+
        // Process expired timeouts
        var remaining []*jobtypes.Timeout
        for _, timeout := range bucket.timeouts {
@@ -181,8 +186,18 @@ func (hwt *hashedWheelTimer) tick() {
                        continue // skip cancelled timeouts
                }
 
-               // Check if timeout has expired
-               if time.Now().After(timeout.Deadline()) {
+               now := time.Now()
+               deadline := timeout.Deadline()
+
+               // Check if timeout should execute based on wheel position
+               // A timeout should execute when currentTick >= its targetTick
+               wheelIndex := timeout.WheelIndex()
+               if currentTick >= int64(wheelIndex) {
+                       // Only log actual task execution at INFO level
+                       hwt.logger.Info("executing scheduled task",
+                               "currentTick", currentTick,
+                               "targetTick", wheelIndex)
+
                        // Execute the timeout task asynchronously
                        go func(t *jobtypes.Timeout) {
                                defer func() {
@@ -196,7 +211,12 @@ func (hwt *hashedWheelTimer) tick() {
                                }
                        }(timeout)
                } else {
-                       // Not yet expired, keep in bucket
+                       // Not yet time to execute, keep in bucket
+                       hwt.logger.V(1).Info("timeout not ready by wheel 
position",
+                               "currentTick", currentTick,
+                               "targetTick", wheelIndex,
+                               "deadline", deadline,
+                               "now", now)
                        remaining = append(remaining, timeout)
                }
        }
@@ -216,10 +236,11 @@ func (hwt *hashedWheelTimer) GetStats() 
map[string]interface{} {
 
        return map[string]interface{}{
                "wheelSize":     hwt.wheelSize,
-               "tickDuration":  hwt.tickDuration,
+               "tickDuration":  hwt.tickDuration.String(),
                "currentTick":   atomic.LoadInt64(&hwt.currentTick),
                "totalTimeouts": totalTimeouts,
                "started":       atomic.LoadInt32(&hwt.started) == 1,
                "stopped":       atomic.LoadInt32(&hwt.stopped) == 1,
+               "wheelPeriod":   (time.Duration(hwt.wheelSize) * 
hwt.tickDuration).String(),
        }
 }
diff --git a/internal/collector/common/dispatcher/time_wheel.go 
b/internal/collector/common/dispatcher/time_wheel.go
index 30d02ad..d8c7aa1 100644
--- a/internal/collector/common/dispatcher/time_wheel.go
+++ b/internal/collector/common/dispatcher/time_wheel.go
@@ -29,6 +29,16 @@ import (
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
+// JobInfo holds information about a scheduled job
+type JobInfo struct {
+       Job             *jobtypes.Job
+       Timeout         *jobtypes.Timeout
+       CreatedAt       time.Time
+       LastExecutedAt  time.Time
+       NextExecutionAt time.Time
+       ExecutionCount  int64
+}
+
 // TimeDispatch manages time-based job scheduling using a hashed wheel timer
 type TimeDispatch struct {
        logger           logger.Logger
@@ -36,8 +46,26 @@ type TimeDispatch struct {
        commonDispatcher MetricsTaskDispatcher
        cyclicTasks      sync.Map // map[int64]*jobtypes.Timeout for cyclic jobs
        tempTasks        sync.Map // map[int64]*jobtypes.Timeout for one-time 
jobs
+       jobInfos         sync.Map // map[int64]*JobInfo for detailed job 
information
        started          bool
        mu               sync.RWMutex
+
+       // Statistics
+       stats       *DispatcherStats
+       statsLogger *time.Ticker
+       statsCancel context.CancelFunc
+}
+
+// DispatcherStats holds dispatcher statistics
+type DispatcherStats struct {
+       StartTime          time.Time
+       TotalJobsAdded     int64
+       TotalJobsRemoved   int64
+       TotalJobsExecuted  int64
+       TotalJobsCompleted int64
+       TotalJobsFailed    int64
+       LastExecutionTime  time.Time
+       mu                 sync.RWMutex
 }
 
 // MetricsTaskDispatcher interface for metrics task dispatching
@@ -50,6 +78,7 @@ type HashedWheelTimer interface {
        NewTimeout(task jobtypes.TimerTask, delay time.Duration) 
*jobtypes.Timeout
        Start(ctx context.Context) error
        Stop() error
+       GetStats() map[string]interface{}
 }
 
 // NewTimeDispatch creates a new time dispatcher
@@ -58,6 +87,9 @@ func NewTimeDispatch(logger logger.Logger, commonDispatcher 
MetricsTaskDispatche
                logger:           logger.WithName("time-dispatch"),
                commonDispatcher: commonDispatcher,
                started:          false,
+               stats: &DispatcherStats{
+                       StartTime: time.Now(),
+               },
        }
 
        // Create hashed wheel timer with reasonable defaults
@@ -74,32 +106,56 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
                return fmt.Errorf("job cannot be nil")
        }
 
-       td.logger.Info("adding job to time dispatcher",
+       // Log job addition at debug level
+       td.logger.V(1).Info("adding job",
                "jobID", job.ID,
-               "isCyclic", job.IsCyclic,
                "interval", job.DefaultInterval)
 
+       // Update statistics
+       td.stats.mu.Lock()
+       td.stats.TotalJobsAdded++
+       td.stats.mu.Unlock()
+
        // Create wheel timer task
-       timerTask := NewWheelTimerTask(job, td.commonDispatcher, td.logger)
+       timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger)
 
        // Calculate delay
        var delay time.Duration
        if job.DefaultInterval > 0 {
-               delay = time.Duration(job.DefaultInterval) * time.Second
+               delay = time.Duration(job.DefaultInterval) * time.Millisecond
+               td.logger.V(1).Info("calculated delay",
+                       "interval", job.DefaultInterval,
+                       "delay", delay)
        } else {
-               delay = 30 * time.Second // default interval
+               delay = 10 * time.Second
        }
 
        // Create timeout
        timeout := td.wheelTimer.NewTimeout(timerTask, delay)
 
-       // Store timeout based on job type
+       // Create job info
+       now := time.Now()
+       jobInfo := &JobInfo{
+               Job:             job,
+               Timeout:         timeout,
+               CreatedAt:       now,
+               NextExecutionAt: now.Add(delay),
+               ExecutionCount:  0,
+       }
+
+       // Store timeout and job info based on job type
        if job.IsCyclic {
                td.cyclicTasks.Store(job.ID, timeout)
-               td.logger.Info("added cyclic job to scheduler", "jobID", 
job.ID, "delay", delay)
+               td.jobInfos.Store(job.ID, jobInfo)
+               td.logger.Info("scheduled cyclic job",
+                       "jobID", job.ID,
+                       "nextExecution", jobInfo.NextExecutionAt)
        } else {
                td.tempTasks.Store(job.ID, timeout)
-               td.logger.Info("added one-time job to scheduler", "jobID", 
job.ID, "delay", delay)
+               td.jobInfos.Store(job.ID, jobInfo)
+               td.logger.Info("scheduled one-time job",
+                       "jobID", job.ID,
+                       "nextExecution", jobInfo.NextExecutionAt)
        }
 
        return nil
@@ -107,13 +163,19 @@ func (td *TimeDispatch) AddJob(job *jobtypes.Job) error {
 
 // RemoveJob removes a job from the scheduler
 func (td *TimeDispatch) RemoveJob(jobID int64) error {
-       td.logger.Info("removing job from time dispatcher", "jobID", jobID)
+       td.logger.V(1).Info("removing job", "jobID", jobID)
+
+       // Update statistics
+       td.stats.mu.Lock()
+       td.stats.TotalJobsRemoved++
+       td.stats.mu.Unlock()
 
        // Try to remove from cyclic tasks
        if timeoutInterface, exists := td.cyclicTasks.LoadAndDelete(jobID); 
exists {
                if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
                        timeout.Cancel()
-                       td.logger.Info("removed cyclic job", "jobID", jobID)
+                       td.jobInfos.Delete(jobID)
+                       td.logger.V(1).Info("removed cyclic job", "jobID", 
jobID)
                        return nil
                }
        }
@@ -122,7 +184,8 @@ func (td *TimeDispatch) RemoveJob(jobID int64) error {
        if timeoutInterface, exists := td.tempTasks.LoadAndDelete(jobID); 
exists {
                if timeout, ok := timeoutInterface.(*jobtypes.Timeout); ok {
                        timeout.Cancel()
-                       td.logger.Info("removed one-time job", "jobID", jobID)
+                       td.jobInfos.Delete(jobID)
+                       td.logger.V(1).Info("removed one-time job", "jobID", 
jobID)
                        return nil
                }
        }
@@ -146,8 +209,11 @@ func (td *TimeDispatch) Start(ctx context.Context) error {
                return fmt.Errorf("failed to start wheel timer: %w", err)
        }
 
+       // Start periodic statistics logging
+       td.startStatsLogging(ctx)
+
        td.started = true
-       td.logger.Info("time dispatcher started successfully")
+       // Started successfully
        return nil
 }
 
@@ -162,6 +228,9 @@ func (td *TimeDispatch) Stop() error {
 
        td.logger.Info("stopping time dispatcher")
 
+       // Stop statistics logging
+       td.stopStatsLogging()
+
        // Cancel all running tasks
        td.cyclicTasks.Range(func(key, value interface{}) bool {
                if timeout, ok := value.(*jobtypes.Timeout); ok {
@@ -180,6 +249,7 @@ func (td *TimeDispatch) Stop() error {
        // Clear maps
        td.cyclicTasks = sync.Map{}
        td.tempTasks = sync.Map{}
+       td.jobInfos = sync.Map{}
 
        // Stop wheel timer
        if err := td.wheelTimer.Stop(); err != nil {
@@ -206,9 +276,230 @@ func (td *TimeDispatch) Stats() map[string]any {
                return true
        })
 
+       td.stats.mu.RLock()
+       uptime := time.Since(td.stats.StartTime)
+       totalJobsAdded := td.stats.TotalJobsAdded
+       totalJobsRemoved := td.stats.TotalJobsRemoved
+       totalJobsExecuted := td.stats.TotalJobsExecuted
+       totalJobsCompleted := td.stats.TotalJobsCompleted
+       totalJobsFailed := td.stats.TotalJobsFailed
+       lastExecutionTime := td.stats.LastExecutionTime
+       td.stats.mu.RUnlock()
+
        return map[string]any{
-               "cyclicJobs": cyclicCount,
-               "tempJobs":   tempCount,
-               "started":    td.started,
+               "cyclicJobs":         cyclicCount,
+               "tempJobs":           tempCount,
+               "started":            td.started,
+               "uptime":             uptime,
+               "totalJobsAdded":     totalJobsAdded,
+               "totalJobsRemoved":   totalJobsRemoved,
+               "totalJobsExecuted":  totalJobsExecuted,
+               "totalJobsCompleted": totalJobsCompleted,
+               "totalJobsFailed":    totalJobsFailed,
+               "lastExecutionTime":  lastExecutionTime,
+       }
+}
+
+// RecordJobExecution records job execution statistics
+func (td *TimeDispatch) RecordJobExecution() {
+       td.stats.mu.Lock()
+       td.stats.TotalJobsExecuted++
+       td.stats.LastExecutionTime = time.Now()
+       td.stats.mu.Unlock()
+}
+
+// RecordJobCompleted records job completion
+func (td *TimeDispatch) RecordJobCompleted() {
+       td.stats.mu.Lock()
+       td.stats.TotalJobsCompleted++
+       td.stats.mu.Unlock()
+}
+
+// RecordJobFailed records job failure
+func (td *TimeDispatch) RecordJobFailed() {
+       td.stats.mu.Lock()
+       td.stats.TotalJobsFailed++
+       td.stats.mu.Unlock()
+}
+
+// UpdateJobExecution updates job execution information
+func (td *TimeDispatch) UpdateJobExecution(jobID int64) {
+       if jobInfoInterface, exists := td.jobInfos.Load(jobID); exists {
+               if jobInfo, ok := jobInfoInterface.(*JobInfo); ok {
+                       now := time.Now()
+                       jobInfo.LastExecutedAt = now
+                       jobInfo.ExecutionCount++
+
+                       // Update next execution time for cyclic jobs
+                       if jobInfo.Job.IsCyclic && jobInfo.Job.DefaultInterval 
> 0 {
+                               jobInfo.NextExecutionAt = 
now.Add(time.Duration(jobInfo.Job.DefaultInterval) * time.Second)
+                       }
+               }
        }
 }
+
+// RescheduleJob reschedules a cyclic job for the next execution
+func (td *TimeDispatch) RescheduleJob(job *jobtypes.Job) error {
+       if !job.IsCyclic || job.DefaultInterval <= 0 {
+               return fmt.Errorf("job is not cyclable or has invalid interval")
+       }
+
+       // Create new timer task for next execution
+       nextDelay := time.Duration(job.DefaultInterval) * time.Second
+       timerTask := NewWheelTimerTask(job, td.commonDispatcher, td, td.logger)
+
+       // Schedule the next execution
+       timeout := td.wheelTimer.NewTimeout(timerTask, nextDelay)
+
+       // Update the timeout in cyclicTasks
+       td.cyclicTasks.Store(job.ID, timeout)
+
+       // Update job info next execution time
+       if jobInfoInterface, exists := td.jobInfos.Load(job.ID); exists {
+               if jobInfo, ok := jobInfoInterface.(*JobInfo); ok {
+                       jobInfo.NextExecutionAt = time.Now().Add(nextDelay)
+                       jobInfo.Timeout = timeout
+               }
+       }
+
+       nextExecutionTime := time.Now().Add(nextDelay)
+       td.logger.Info("rescheduled cyclic job",
+               "jobID", job.ID,
+               "interval", job.DefaultInterval,
+               "nextExecution", nextExecutionTime,
+               "delay", nextDelay)
+
+       return nil
+}
+
+// startStatsLogging starts periodic statistics logging
+func (td *TimeDispatch) startStatsLogging(ctx context.Context) {
+       // Create a context that can be cancelled
+       statsCtx, cancel := context.WithCancel(ctx)
+       td.statsCancel = cancel
+
+       // Create ticker for 1-minute intervals
+       td.statsLogger = time.NewTicker(1 * time.Minute)
+
+       go func() {
+               defer td.statsLogger.Stop()
+
+               // Log initial statistics
+               td.logStatistics()
+
+               for {
+                       select {
+                       case <-td.statsLogger.C:
+                               td.logStatistics()
+                       case <-statsCtx.Done():
+                               td.logger.Info("statistics logging stopped")
+                               return
+                       }
+               }
+       }()
+
+       td.logger.Info("statistics logging started", "interval", "1m")
+}
+
+// stopStatsLogging stops periodic statistics logging
+func (td *TimeDispatch) stopStatsLogging() {
+       if td.statsCancel != nil {
+               td.statsCancel()
+               td.statsCancel = nil
+       }
+       if td.statsLogger != nil {
+               td.statsLogger.Stop()
+               td.statsLogger = nil
+       }
+}
+
+// logStatistics logs current dispatcher statistics
+func (td *TimeDispatch) logStatistics() {
+       stats := td.Stats()
+
+       // Get detailed job information
+       var cyclicJobDetails []map[string]any
+       var tempJobDetails []map[string]any
+
+       td.cyclicTasks.Range(func(key, value interface{}) bool {
+               if jobID, ok := key.(int64); ok {
+                       if jobInfoInterface, exists := td.jobInfos.Load(jobID); 
exists {
+                               if jobInfo, ok := jobInfoInterface.(*JobInfo); 
ok {
+                                       now := time.Now()
+                                       detail := map[string]any{
+                                               "jobID":         jobID,
+                                               "app":           
jobInfo.Job.App,
+                                               "monitorID":     
jobInfo.Job.MonitorID,
+                                               "interval":      
jobInfo.Job.DefaultInterval,
+                                               "createdAt":     
jobInfo.CreatedAt,
+                                               "lastExecuted":  
jobInfo.LastExecutedAt,
+                                               "nextExecution": 
jobInfo.NextExecutionAt,
+                                               "execCount":     
jobInfo.ExecutionCount,
+                                               "timeSinceLastExec": func() 
string {
+                                                       if 
jobInfo.LastExecutedAt.IsZero() {
+                                                               return "never"
+                                                       }
+                                                       return 
now.Sub(jobInfo.LastExecutedAt).String()
+                                               }(),
+                                               "timeToNextExec": func() string 
{
+                                                       if 
jobInfo.NextExecutionAt.IsZero() {
+                                                               return "unknown"
+                                                       }
+                                                       if 
jobInfo.NextExecutionAt.Before(now) {
+                                                               return "overdue"
+                                                       }
+                                                       return 
jobInfo.NextExecutionAt.Sub(now).String()
+                                               }(),
+                                       }
+                                       cyclicJobDetails = 
append(cyclicJobDetails, detail)
+                               }
+                       }
+               }
+               return true
+       })
+
+       td.tempTasks.Range(func(key, value interface{}) bool {
+               if jobID, ok := key.(int64); ok {
+                       if jobInfoInterface, exists := td.jobInfos.Load(jobID); 
exists {
+                               if jobInfo, ok := jobInfoInterface.(*JobInfo); 
ok {
+                                       detail := map[string]any{
+                                               "jobID":         jobID,
+                                               "app":           
jobInfo.Job.App,
+                                               "monitorID":     
jobInfo.Job.MonitorID,
+                                               "createdAt":     
jobInfo.CreatedAt,
+                                               "nextExecution": 
jobInfo.NextExecutionAt,
+                                               "execCount":     
jobInfo.ExecutionCount,
+                                       }
+                                       tempJobDetails = append(tempJobDetails, 
detail)
+                               }
+                       }
+               }
+               return true
+       })
+
+       // Get timer wheel statistics
+       var timerStats map[string]interface{}
+       if td.wheelTimer != nil {
+               timerStats = td.wheelTimer.GetStats()
+       }
+
+       td.logger.Info("📊 Dispatcher Statistics Report",
+               "uptime", stats["uptime"],
+               "activeJobs", map[string]any{
+                       "cyclic": stats["cyclicJobs"],
+                       "temp":   stats["tempJobs"],
+                       "total":  stats["cyclicJobs"].(int) + 
stats["tempJobs"].(int),
+               },
+               "jobCounters", map[string]any{
+                       "added":     stats["totalJobsAdded"],
+                       "removed":   stats["totalJobsRemoved"],
+                       "executed":  stats["totalJobsExecuted"],
+                       "completed": stats["totalJobsCompleted"],
+                       "failed":    stats["totalJobsFailed"],
+               },
+               "timerWheelConfig", timerStats,
+               "lastExecution", stats["lastExecutionTime"],
+               "cyclicJobs", cyclicJobDetails,
+               "tempJobs", tempJobDetails,
+       )
+}
diff --git a/internal/collector/common/dispatcher/wheel_timer_task.go 
b/internal/collector/common/dispatcher/wheel_timer_task.go
index 3eb53e3..657103a 100644
--- a/internal/collector/common/dispatcher/wheel_timer_task.go
+++ b/internal/collector/common/dispatcher/wheel_timer_task.go
@@ -28,19 +28,30 @@ import (
        "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
 )
 
+// StatsRecorder interface for recording job execution statistics
+type StatsRecorder interface {
+       RecordJobExecution()
+       RecordJobCompleted()
+       RecordJobFailed()
+       UpdateJobExecution(jobID int64)
+       RescheduleJob(job *jobtypes.Job) error
+}
+
 // WheelTimerTask represents a task that runs in the time wheel
 // This corresponds to Java's WheelTimerTask
 type WheelTimerTask struct {
        job              *jobtypes.Job
        commonDispatcher MetricsTaskDispatcher
+       statsRecorder    StatsRecorder
        logger           logger.Logger
 }
 
 // NewWheelTimerTask creates a new wheel timer task
-func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher 
MetricsTaskDispatcher, logger logger.Logger) *WheelTimerTask {
+func NewWheelTimerTask(job *jobtypes.Job, commonDispatcher 
MetricsTaskDispatcher, statsRecorder StatsRecorder, logger logger.Logger) 
*WheelTimerTask {
        return &WheelTimerTask{
                job:              job,
                commonDispatcher: commonDispatcher,
+               statsRecorder:    statsRecorder,
                logger:           logger.WithName("wheel-timer-task"),
        }
 }
@@ -53,11 +64,16 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
                return fmt.Errorf("job is nil, cannot execute")
        }
 
-       wtt.logger.Info("executing wheel timer task",
+       wtt.logger.V(1).Info("executing task",
                "jobID", wtt.job.ID,
-               "monitorID", wtt.job.MonitorID,
                "app", wtt.job.App)
 
+       // Record job execution start
+       if wtt.statsRecorder != nil {
+               wtt.statsRecorder.RecordJobExecution()
+               wtt.statsRecorder.UpdateJobExecution(wtt.job.ID)
+       }
+
        startTime := time.Now()
 
        // Create a context for this specific task execution
@@ -74,13 +90,23 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
                wtt.logger.Error(err, "failed to dispatch metrics task",
                        "jobID", wtt.job.ID,
                        "duration", duration)
+
+               // Record job failure
+               if wtt.statsRecorder != nil {
+                       wtt.statsRecorder.RecordJobFailed()
+               }
                return err
        }
 
-       wtt.logger.Info("successfully dispatched metrics task",
+       wtt.logger.V(1).Info("task completed",
                "jobID", wtt.job.ID,
                "duration", duration)
 
+       // Record job completion
+       if wtt.statsRecorder != nil {
+               wtt.statsRecorder.RecordJobCompleted()
+       }
+
        // If this is a cyclic job, reschedule it for the next execution
        if wtt.job.IsCyclic && wtt.job.DefaultInterval > 0 {
                wtt.rescheduleJob(timeout)
@@ -91,17 +117,20 @@ func (wtt *WheelTimerTask) Run(timeout *jobtypes.Timeout) 
error {
 
 // rescheduleJob reschedules a cyclic job for the next execution
 func (wtt *WheelTimerTask) rescheduleJob(timeout *jobtypes.Timeout) {
-       wtt.logger.Info("rescheduling cyclic job",
-               "jobID", wtt.job.ID,
-               "interval", wtt.job.DefaultInterval)
+       if wtt.job.DefaultInterval <= 0 {
+               wtt.logger.Info("job has no valid interval, not rescheduling",
+                       "jobID", wtt.job.ID)
+               return
+       }
 
-       // TODO: Implement rescheduling logic
-       // This would involve creating a new timeout with the same job
-       // and adding it back to the time wheel
-       // For now, we'll log that rescheduling is needed
-       wtt.logger.Info("cyclic job needs rescheduling",
-               "jobID", wtt.job.ID,
-               "nextExecutionIn", 
time.Duration(wtt.job.DefaultInterval)*time.Second)
+       // Use the stats recorder to reschedule the job
+       if wtt.statsRecorder != nil {
+               if err := wtt.statsRecorder.RescheduleJob(wtt.job); err != nil {
+                       wtt.logger.Error(err, "failed to reschedule job", 
"jobID", wtt.job.ID)
+               }
+       } else {
+               wtt.logger.Error(nil, "stats recorder is nil", "jobID", 
wtt.job.ID)
+       }
 }
 
 // GetJob returns the associated job
diff --git a/internal/collector/common/router/message_router.go 
b/internal/collector/common/router/message_router.go
new file mode 100644
index 0000000..262dff7
--- /dev/null
+++ b/internal/collector/common/router/message_router.go
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package router
+
+import (
+       "encoding/json"
+       "fmt"
+
+       pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+       
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/job"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// MessageType constants matching Java version
+const (
+       MessageTypeHeartbeat                int32 = 0
+       MessageTypeGoOnline                 int32 = 1
+       MessageTypeGoOffline                int32 = 2
+       MessageTypeGoClose                  int32 = 3
+       MessageTypeIssueCyclicTask          int32 = 4
+       MessageTypeDeleteCyclicTask         int32 = 5
+       MessageTypeIssueOneTimeTask         int32 = 6
+       MessageTypeResponseCyclicTaskData   int32 = 7
+       MessageTypeResponseOneTimeTaskData  int32 = 8
+       MessageTypeResponseCyclicTaskSdData int32 = 9
+)
+
+// MessageRouter is the interface for routing messages between transport and 
job components
+type MessageRouter interface {
+       // RegisterProcessors registers all message processors
+       RegisterProcessors()
+
+       // SendResult sends collection results back to the manager
+       SendResult(data *jobtypes.CollectRepMetricsData, job *jobtypes.Job) 
error
+
+       // GetIdentity returns the collector identity
+       GetIdentity() string
+}
+
+// MessageRouterImpl implements the MessageRouter interface
+type MessageRouterImpl struct {
+       logger    logger.Logger
+       client    transport.TransportClient
+       jobRunner *job.Runner
+       identity  string
+}
+
+// Config represents message router configuration
+type Config struct {
+       Logger    logger.Logger
+       Client    transport.TransportClient
+       JobRunner *job.Runner
+       Identity  string
+}
+
+// New creates a new message router
+func New(cfg *Config) MessageRouter {
+       return &MessageRouterImpl{
+               logger:    cfg.Logger.WithName("message-router"),
+               client:    cfg.Client,
+               jobRunner: cfg.JobRunner,
+               identity:  cfg.Identity,
+       }
+}
+
+// RegisterProcessors registers all message processors
+func (r *MessageRouterImpl) RegisterProcessors() {
+       if r.client == nil {
+               r.logger.Error(nil, "cannot register processors: client is nil")
+               return
+       }
+
+       // Register cyclic task processor
+       r.client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
+               if pbMsg, ok := msg.(*pb.Message); ok {
+                       return r.handleIssueCyclicTask(pbMsg)
+               }
+               return nil, fmt.Errorf("invalid message type")
+       })
+
+       // Register delete cyclic task processor
+       r.client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
+               if pbMsg, ok := msg.(*pb.Message); ok {
+                       return r.handleDeleteCyclicTask(pbMsg)
+               }
+               return nil, fmt.Errorf("invalid message type")
+       })
+
+       // Register one-time task processor
+       r.client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
+               if pbMsg, ok := msg.(*pb.Message); ok {
+                       return r.handleIssueOneTimeTask(pbMsg)
+               }
+               return nil, fmt.Errorf("invalid message type")
+       })
+
+       // Other processors can be registered here
+       r.logger.Info("Successfully registered all message processors")
+}
+
+// handleIssueCyclicTask handles cyclic task messages
+func (r *MessageRouterImpl) handleIssueCyclicTask(msg *pb.Message) 
(*pb.Message, error) {
+       r.logger.Info("Handling cyclic task message")
+
+       // Parse job from message
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               r.logger.Error(err, "failed to unmarshal job")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte("failed to unmarshal job"),
+               }, nil
+       }
+
+       // Add job to job runner
+       if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil {
+               r.logger.Error(err, "failed to add job to job runner", "jobID", 
job.ID)
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte(fmt.Sprintf("failed to add job: %v", 
err)),
+               }, nil
+       }
+
+       return &pb.Message{
+               Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+               Direction: pb.Direction_RESPONSE,
+               Identity:  r.identity,
+               Msg:       []byte("job added successfully"),
+       }, nil
+}
+
+// handleDeleteCyclicTask handles delete cyclic task messages
+func (r *MessageRouterImpl) handleDeleteCyclicTask(msg *pb.Message) 
(*pb.Message, error) {
+       r.logger.Info("Handling delete cyclic task message")
+
+       // Parse job IDs from message
+       var jobIDs []int64
+       if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil {
+               r.logger.Error(err, "failed to unmarshal job IDs")
+               return &pb.Message{
+                       Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte("failed to unmarshal job IDs"),
+               }, nil
+       }
+
+       // Remove jobs from job runner
+       for _, jobID := range jobIDs {
+               if err := r.jobRunner.RemoveAsyncCollectJob(jobID); err != nil {
+                       r.logger.Error(err, "failed to remove job from job 
runner", "jobID", jobID)
+                       // Continue with other jobs even if one fails
+               }
+       }
+
+       return &pb.Message{
+               Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+               Direction: pb.Direction_RESPONSE,
+               Identity:  r.identity,
+               Msg:       []byte("jobs removed successfully"),
+       }, nil
+}
+
+// handleIssueOneTimeTask handles one-time task messages
+func (r *MessageRouterImpl) handleIssueOneTimeTask(msg *pb.Message) 
(*pb.Message, error) {
+       r.logger.Info("Handling one-time task message")
+
+       // Parse job from message
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               r.logger.Error(err, "failed to unmarshal job")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte("failed to unmarshal job"),
+               }, nil
+       }
+
+       // Set job as non-cyclic
+       job.IsCyclic = false
+
+       // Add job to job runner
+       if err := r.jobRunner.AddAsyncCollectJob(&job); err != nil {
+               r.logger.Error(err, "failed to add one-time job to job runner", 
"jobID", job.ID)
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  r.identity,
+                       Msg:       []byte(fmt.Sprintf("failed to add one-time 
job: %v", err)),
+               }, nil
+       }
+
+       return &pb.Message{
+               Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+               Direction: pb.Direction_RESPONSE,
+               Identity:  r.identity,
+               Msg:       []byte("one-time job added successfully"),
+       }, nil
+}
+
+// SendResult sends collection results back to the manager
+func (r *MessageRouterImpl) SendResult(data *jobtypes.CollectRepMetricsData, 
job *jobtypes.Job) error {
+       if r.client == nil || !r.client.IsStarted() {
+               return fmt.Errorf("transport client not started")
+       }
+
+       // Determine message type based on job type
+       var msgType pb.MessageType
+       if job.IsCyclic {
+               msgType = pb.MessageType_RESPONSE_CYCLIC_TASK_DATA
+       } else {
+               msgType = pb.MessageType_RESPONSE_ONE_TIME_TASK_DATA
+       }
+
+       // Serialize metrics data
+       dataBytes, err := json.Marshal([]jobtypes.CollectRepMetricsData{*data})
+       if err != nil {
+               return fmt.Errorf("failed to marshal metrics data: %w", err)
+       }
+
+       // Create message
+       msg := &pb.Message{
+               Type:      msgType,
+               Direction: pb.Direction_REQUEST,
+               Identity:  r.identity,
+               Msg:       dataBytes,
+       }
+
+       // Send message
+       if err := r.client.SendMsg(msg); err != nil {
+               return fmt.Errorf("failed to send metrics data: %w", err)
+       }
+
+       r.logger.Info("Successfully sent metrics data",
+               "jobID", job.ID,
+               "metricsName", data.Metrics,
+               "isCyclic", job.IsCyclic)
+
+       return nil
+}
+
+// GetIdentity returns the collector identity
+func (r *MessageRouterImpl) GetIdentity() string {
+       return r.identity
+}
diff --git a/internal/collector/common/transport/transport.go 
b/internal/collector/common/transport/transport.go
index ee1da89..44058e3 100644
--- a/internal/collector/common/transport/transport.go
+++ b/internal/collector/common/transport/transport.go
@@ -45,8 +45,9 @@ const (
 )
 
 type Runner struct {
-       Config *configtypes.CollectorConfig
-       client transport.TransportClient
+       Config       *configtypes.CollectorConfig
+       client       transport.TransportClient
+       jobScheduler transport.JobScheduler
        clrserver.Server
 }
 
@@ -59,6 +60,11 @@ func New(cfg *configtypes.CollectorConfig) *Runner {
        }
 }
 
+// SetJobScheduler sets the job scheduler for the transport runner
+func (r *Runner) SetJobScheduler(scheduler transport.JobScheduler) {
+       r.jobScheduler = scheduler
+}
+
 // NewFromConfig creates a new transport runner from collector configuration
 func NewFromConfig(cfg *configtypes.CollectorConfig) *Runner {
        if cfg == nil {
@@ -150,7 +156,14 @@ func (r *Runner) Start(ctx context.Context) error {
                                r.Logger.Error(event.Error, "Failed to connect 
to manager gRPC server", "addr", event.Address)
                        }
                })
-               transport.RegisterDefaultProcessors(c)
+               // Register processors with job scheduler
+               if r.jobScheduler != nil {
+                       transport.RegisterDefaultProcessors(c, r.jobScheduler)
+                       r.Logger.Info("Registered gRPC processors with job 
scheduler")
+               } else {
+                       transport.RegisterDefaultProcessors(c, nil)
+                       r.Logger.Info("Registered gRPC processors without job 
scheduler")
+               }
        case *transport.NettyClient:
                c.SetEventHandler(func(event transport.Event) {
                        switch event.Type {
@@ -163,7 +176,14 @@ func (r *Runner) Start(ctx context.Context) error {
                                r.Logger.Error(event.Error, "Failed to connect 
to manager netty server", "addr", event.Address)
                        }
                })
-               transport.RegisterDefaultNettyProcessors(c)
+               // Register processors with job scheduler
+               if r.jobScheduler != nil {
+                       transport.RegisterDefaultNettyProcessors(c, 
r.jobScheduler)
+                       r.Logger.Info("Registered netty processors with job 
scheduler")
+               } else {
+                       transport.RegisterDefaultNettyProcessors(c, nil)
+                       r.Logger.Info("Registered netty processors without job 
scheduler")
+               }
        }
 
        if err := r.client.Start(); err != nil {
diff --git a/internal/collector/common/types/job/job_types.go 
b/internal/collector/common/types/job/job_types.go
index 28478b8..70c3c10 100644
--- a/internal/collector/common/types/job/job_types.go
+++ b/internal/collector/common/types/job/job_types.go
@@ -30,9 +30,9 @@ type Job struct {
        Hide                bool              `json:"hide"`
        Category            string            `json:"category"`
        App                 string            `json:"app"`
-       Name                map[string]string `json:"name"`
-       Help                map[string]string `json:"help"`
-       HelpLink            map[string]string `json:"helpLink"`
+       Name                interface{}       `json:"name"`     // Can be 
string or map[string]string for i18n
+       Help                interface{}       `json:"help"`     // Can be 
string or map[string]string for i18n
+       HelpLink            interface{}       `json:"helpLink"` // Can be 
string or map[string]string for i18n
        Timestamp           int64             `json:"timestamp"`
        DefaultInterval     int64             `json:"defaultInterval"`
        Intervals           []int64           `json:"intervals"`
@@ -41,7 +41,10 @@ type Job struct {
        Metrics             []Metrics         `json:"metrics"`
        Configmap           []Configmap       `json:"configmap"`
        IsSd                bool              `json:"isSd"`
+       Sd                  bool              `json:"sd"`
        PrometheusProxyMode bool              `json:"prometheusProxyMode"`
+       Cyclic              bool              `json:"cyclic"`
+       Interval            int64             `json:"interval"`
 
        // Internal fields
        EnvConfigmaps    map[string]Configmap `json:"-"`
diff --git a/internal/collector/common/types/job/metrics_types.go 
b/internal/collector/common/types/job/metrics_types.go
index 6127238..04f035e 100644
--- a/internal/collector/common/types/job/metrics_types.go
+++ b/internal/collector/common/types/job/metrics_types.go
@@ -17,29 +17,37 @@
 
 package job
 
-import "time"
+import (
+       "fmt"
+       "time"
+)
 
 // Metrics represents a metric configuration
 type Metrics struct {
        Name        string            `json:"name"`
+       I18n        interface{}       `json:"i18n,omitempty"` // 
Internationalization info
        Priority    int               `json:"priority"`
+       CollectTime int64             `json:"collectTime"`
+       Interval    int64             `json:"interval"`
+       Visible     *bool             `json:"visible"`
        Fields      []Field           `json:"fields"`
        Aliasfields []string          `json:"aliasfields"`
-       Calculates  []Calculate       `json:"calculates"`
-       Units       []Unit            `json:"units"`
+       AliasFields []string          `json:"aliasFields"` // Alternative field 
name
+       Calculates  interface{}       `json:"calculates"`  // Can be 
[]Calculate or []string
+       Filters     interface{}       `json:"filters"`
+       Units       interface{}       `json:"units"` // Can be []Unit or 
[]string
        Protocol    string            `json:"protocol"`
        Host        string            `json:"host"`
        Port        string            `json:"port"`
        Timeout     string            `json:"timeout"`
-       Interval    int64             `json:"interval"`
        Range       string            `json:"range"`
-       Visible     *bool             `json:"visible"`
        ConfigMap   map[string]string `json:"configMap"`
+       HasSubTask  bool              `json:"hasSubTask"`
 
        // Protocol specific fields
        HTTP    *HTTPProtocol    `json:"http,omitempty"`
        SSH     *SSHProtocol     `json:"ssh,omitempty"`
-       JDBC    *JDBCProtocol    `json:"jdbc,omitempty"`
+       JDBC    interface{}      `json:"jdbc,omitempty"` // Can be JDBCProtocol 
or map[string]interface{}
        SNMP    *SNMPProtocol    `json:"snmp,omitempty"`
        JMX     *JMXProtocol     `json:"jmx,omitempty"`
        Redis   *RedisProtocol   `json:"redis,omitempty"`
@@ -54,6 +62,7 @@ type Field struct {
        Unit     string      `json:"unit"`
        Instance bool        `json:"instance"`
        Value    interface{} `json:"value"`
+       I18n     interface{} `json:"i18n,omitempty"` // Internationalization 
info
 }
 
 // Calculate represents a calculation configuration
@@ -71,8 +80,10 @@ type Unit struct {
 
 // ParamDefine represents a parameter definition
 type ParamDefine struct {
+       ID           interface{} `json:"id"`
+       App          interface{} `json:"app"`
        Field        string      `json:"field"`
-       Name         string      `json:"name"`
+       Name         interface{} `json:"name"` // Can be string or 
map[string]string for i18n
        Type         string      `json:"type"`
        Required     bool        `json:"required"`
        DefaultValue interface{} `json:"defaultValue"`
@@ -80,8 +91,38 @@ type ParamDefine struct {
        Range        string      `json:"range"`
        Limit        int         `json:"limit"`
        Options      []Option    `json:"options"`
-       Depend       *Depend     `json:"depend"`
+       KeyAlias     interface{} `json:"keyAlias"`
+       ValueAlias   interface{} `json:"valueAlias"`
        Hide         bool        `json:"hide"`
+       Creator      interface{} `json:"creator"`
+       Modifier     interface{} `json:"modifier"`
+       GmtCreate    interface{} `json:"gmtCreate"`
+       GmtUpdate    interface{} `json:"gmtUpdate"`
+       Depend       *Depend     `json:"depend"`
+}
+
+// GetName returns the name as string, handling both string and i18n map cases
+func (p *ParamDefine) GetName() string {
+       switch v := p.Name.(type) {
+       case string:
+               return v
+       case map[string]interface{}:
+               // Try to get English name first, then any available language
+               if name, ok := v["en"]; ok {
+                       if s, ok := name.(string); ok {
+                               return s
+                       }
+               }
+               // Fall back to first available name
+               for _, val := range v {
+                       if s, ok := val.(string); ok {
+                               return s
+                       }
+               }
+               return "unknown"
+       default:
+               return fmt.Sprintf("%v", v)
+       }
 }
 
 // Option represents a parameter option
@@ -156,18 +197,18 @@ type SSHProtocol struct {
 
 // JDBCProtocol represents JDBC protocol configuration
 type JDBCProtocol struct {
-       Host            string     `json:"host"`
-       Port            string     `json:"port"`
-       Platform        string     `json:"platform"`
-       Username        string     `json:"username"`
-       Password        string     `json:"password"`
-       Database        string     `json:"database"`
-       Timeout         string     `json:"timeout"`
-       QueryType       string     `json:"queryType"`
-       SQL             string     `json:"sql"`
-       URL             string     `json:"url"`
-       ReuseConnection string     `json:"reuseConnection"`
-       SSHTunnel       *SSHTunnel `json:"sshTunnel,omitempty"`
+       Host            string                 `json:"host"`
+       Port            string                 `json:"port"`
+       Platform        string                 `json:"platform"`
+       Username        string                 `json:"username"`
+       Password        string                 `json:"password"`
+       Database        string                 `json:"database"`
+       Timeout         string                 `json:"timeout"`
+       QueryType       string                 `json:"queryType"`
+       SQL             string                 `json:"sql"`
+       URL             string                 `json:"url"`
+       ReuseConnection string                 `json:"reuseConnection"`
+       SSHTunnel       map[string]interface{} `json:"sshTunnel,omitempty"`
 }
 
 // SNMPProtocol represents SNMP protocol configuration
@@ -270,7 +311,35 @@ func (j *Job) Clone() *Job {
 
        if j.Metrics != nil {
                clone.Metrics = make([]Metrics, len(j.Metrics))
-               copy(clone.Metrics, j.Metrics)
+               for i, metric := range j.Metrics {
+                       clone.Metrics[i] = metric
+
+                       // Deep copy ConfigMap for each metric to avoid 
concurrent access
+                       if metric.ConfigMap != nil {
+                               clone.Metrics[i].ConfigMap = 
make(map[string]string, len(metric.ConfigMap))
+                               for k, v := range metric.ConfigMap {
+                                       clone.Metrics[i].ConfigMap[k] = v
+                               }
+                       }
+
+                       // Deep copy Fields slice
+                       if metric.Fields != nil {
+                               clone.Metrics[i].Fields = make([]Field, 
len(metric.Fields))
+                               copy(clone.Metrics[i].Fields, metric.Fields)
+                       }
+
+                       // Deep copy Aliasfields slice
+                       if metric.Aliasfields != nil {
+                               clone.Metrics[i].Aliasfields = make([]string, 
len(metric.Aliasfields))
+                               copy(clone.Metrics[i].Aliasfields, 
metric.Aliasfields)
+                       }
+
+                       // Deep copy AliasFields slice
+                       if metric.AliasFields != nil {
+                               clone.Metrics[i].AliasFields = make([]string, 
len(metric.AliasFields))
+                               copy(clone.Metrics[i].AliasFields, 
metric.AliasFields)
+                       }
+               }
        }
 
        if j.Configmap != nil {
diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index d598149..ebaeade 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -503,7 +503,7 @@ func (f *TransportClientFactory) CreateClient(protocol, 
addr string) (TransportC
 }
 
 // RegisterDefaultProcessors registers all default message processors for 
Netty client
-func RegisterDefaultNettyProcessors(client *NettyClient) {
+func RegisterDefaultNettyProcessors(client *NettyClient, scheduler 
JobScheduler) {
        client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
                        processor := &HeartbeatProcessor{}
@@ -538,7 +538,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &CollectCyclicDataProcessor{client: nil}
+                       processor := &CollectCyclicDataProcessor{client: nil, 
scheduler: scheduler}
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -546,7 +546,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &DeleteCyclicTaskProcessor{client: nil}
+                       processor := &DeleteCyclicTaskProcessor{client: nil, 
scheduler: scheduler}
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -554,7 +554,7 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := &CollectOneTimeDataProcessor{client: nil}
+                       processor := &CollectOneTimeDataProcessor{client: nil, 
scheduler: scheduler}
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index 8bd71e3..b9df9a3 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -18,11 +18,18 @@
 package transport
 
 import (
+       "encoding/json"
        "fmt"
        "log"
+       "os"
        "time"
 
        pb "hertzbeat.apache.org/hertzbeat-collector-go/api"
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/param"
 )
 
 // Message type constants matching Java version
@@ -44,6 +51,23 @@ type MessageProcessor interface {
        Process(msg *pb.Message) (*pb.Message, error)
 }
 
+// JobScheduler defines the interface for job scheduling operations
+// This interface allows transport layer to interact with job scheduling 
without direct dependencies
+type JobScheduler interface {
+       // AddAsyncCollectJob adds a job to async collection scheduling
+       AddAsyncCollectJob(job *jobtypes.Job) error
+
+       // RemoveAsyncCollectJob removes a job from scheduling
+       RemoveAsyncCollectJob(jobID int64) error
+}
+
+// preprocessJobPasswords is a helper function to decrypt passwords in job 
configmap
+// This should be called once when receiving a job to avoid repeated decryption
+func preprocessJobPasswords(job *jobtypes.Job) error {
+       replacer := param.NewReplacer()
+       return replacer.PreprocessJobPasswords(job)
+}
+
 // HeartbeatProcessor handles heartbeat messages
 type HeartbeatProcessor struct{}
 
@@ -63,7 +87,31 @@ func NewGoOnlineProcessor(client *GrpcClient) 
*GoOnlineProcessor {
 }
 
 func (p *GoOnlineProcessor) Process(msg *pb.Message) (*pb.Message, error) {
-       // Handle go online message
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("go-online-processor")
+
+       // Handle go online message - parse ServerInfo and extract AES secret
+       log.Info("received GO_ONLINE message from manager")
+
+       if len(msg.Msg) == 0 {
+               log.Info("empty message from server, please upgrade server")
+       } else {
+               // Parse ServerInfo from message (matches Java: 
JsonUtil.fromJson(message.getMsg().toStringUtf8(), ServerInfo.class))
+               var serverInfo struct {
+                       AesSecret string `json:"aesSecret"`
+               }
+
+               if err := json.Unmarshal(msg.Msg, &serverInfo); err != nil {
+                       log.Error(err, "failed to parse ServerInfo from 
manager")
+               } else if serverInfo.AesSecret == "" {
+                       log.Info("server response has empty secret, please 
check configuration")
+               } else {
+                       // Set the AES secret key globally (matches Java: 
AesUtil.setDefaultSecretKey(serverInfo.getAesSecret()))
+                       log.Info("received AES secret from manager", 
"keyLength", len(serverInfo.AesSecret))
+                       crypto.SetDefaultSecretKey(serverInfo.AesSecret)
+               }
+       }
+
+       log.Info("online message processed successfully")
        return &pb.Message{
                Type:      pb.MessageType_GO_ONLINE,
                Direction: pb.Direction_RESPONSE,
@@ -128,65 +176,238 @@ func (p *GoCloseProcessor) Process(msg *pb.Message) 
(*pb.Message, error) {
 
 // CollectCyclicDataProcessor handles cyclic task messages
 type CollectCyclicDataProcessor struct {
-       client *GrpcClient
+       client    *GrpcClient
+       scheduler JobScheduler
 }
 
-func NewCollectCyclicDataProcessor(client *GrpcClient) 
*CollectCyclicDataProcessor {
-       return &CollectCyclicDataProcessor{client: client}
+func NewCollectCyclicDataProcessor(client *GrpcClient, scheduler JobScheduler) 
*CollectCyclicDataProcessor {
+       return &CollectCyclicDataProcessor{
+               client:    client,
+               scheduler: scheduler,
+       }
 }
 
 func (p *CollectCyclicDataProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
-       // Handle cyclic task message
-       // TODO: Implement actual task processing logic
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("cyclic-task-processor")
+       log.Info("processing cyclic task message", "identity", msg.Identity)
+
+       if p.scheduler == nil {
+               log.Info("no job scheduler available, cannot process cyclic 
task")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte("no job scheduler available"),
+               }, nil
+       }
+
+       // Parse job from message
+       log.Info("Received cyclic task JSON: %s", string(msg.Msg))
+
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               log.Error(err, "Failed to unmarshal job from cyclic task 
message: %v")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to parse job: 
%v", err)),
+               }, nil
+       }
+
+       // Preprocess passwords once (decrypt encrypted passwords in configmap)
+       // This avoids repeated decryption during each task execution
+       if err := preprocessJobPasswords(&job); err != nil {
+               log.Error(err, "Failed to preprocess job passwords")
+       }
+
+       // Ensure job is marked as cyclic
+       job.IsCyclic = true
+
+       log.Info("Adding cyclic job to scheduler",
+               "jobID", job.ID,
+               "monitorID", job.MonitorID,
+               "app", job.App)
+
+       // Add job to scheduler
+       if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
+               log.Error(err, "Failed to add cyclic job to scheduler: %v")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to schedule job: 
%v", err)),
+               }, nil
+       }
+
        return &pb.Message{
                Type:      pb.MessageType_ISSUE_CYCLIC_TASK,
                Direction: pb.Direction_RESPONSE,
                Identity:  msg.Identity,
-               Msg:       []byte("cyclic task ack"),
+               Msg:       []byte("cyclic task scheduled successfully"),
        }, nil
 }
 
 // DeleteCyclicTaskProcessor handles delete cyclic task messages
 type DeleteCyclicTaskProcessor struct {
-       client *GrpcClient
+       client    *GrpcClient
+       scheduler JobScheduler
 }
 
-func NewDeleteCyclicTaskProcessor(client *GrpcClient) 
*DeleteCyclicTaskProcessor {
-       return &DeleteCyclicTaskProcessor{client: client}
+func NewDeleteCyclicTaskProcessor(client *GrpcClient, scheduler JobScheduler) 
*DeleteCyclicTaskProcessor {
+       return &DeleteCyclicTaskProcessor{
+               client:    client,
+               scheduler: scheduler,
+       }
 }
 
 func (p *DeleteCyclicTaskProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
-       // Handle delete cyclic task message
+       log.Printf("Processing delete cyclic task message, identity: %s", 
msg.Identity)
+
+       if p.scheduler == nil {
+               log.Printf("No job scheduler available, cannot process delete 
cyclic task")
+               return &pb.Message{
+                       Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte("no job scheduler available"),
+               }, nil
+       }
+
+       // Parse job IDs from message - could be a single job ID or array of IDs
+       var jobIDs []int64
+
+       // Try to parse as array first
+       if err := json.Unmarshal(msg.Msg, &jobIDs); err != nil {
+               // If array parsing fails, try single job ID
+               var singleJobID int64
+               if err2 := json.Unmarshal(msg.Msg, &singleJobID); err2 != nil {
+                       log.Printf("Failed to unmarshal job IDs from delete 
message: %v, %v", err, err2)
+                       return &pb.Message{
+                               Type:      pb.MessageType_DELETE_CYCLIC_TASK,
+                               Direction: pb.Direction_RESPONSE,
+                               Identity:  msg.Identity,
+                               Msg:       []byte(fmt.Sprintf("failed to parse 
job IDs: %v", err)),
+                       }, nil
+               }
+               jobIDs = []int64{singleJobID}
+       }
+
+       log.Printf("Removing %d cyclic jobs from scheduler: %v", len(jobIDs), 
jobIDs)
+
+       // Remove jobs from scheduler
+       var errors []string
+       for _, jobID := range jobIDs {
+               if err := p.scheduler.RemoveAsyncCollectJob(jobID); err != nil {
+                       log.Printf("Failed to remove job %d from scheduler: 
%v", jobID, err)
+                       errors = append(errors, fmt.Sprintf("job %d: %v", 
jobID, err))
+               } else {
+                       log.Printf("Successfully removed job %d from 
scheduler", jobID)
+               }
+       }
+
+       // Prepare response message
+       var responseMsg string
+       if len(errors) > 0 {
+               responseMsg = fmt.Sprintf("partially completed, errors: %v", 
errors)
+       } else {
+               responseMsg = "all cyclic tasks removed successfully"
+       }
+
        return &pb.Message{
                Type:      pb.MessageType_DELETE_CYCLIC_TASK,
                Direction: pb.Direction_RESPONSE,
                Identity:  msg.Identity,
-               Msg:       []byte("delete cyclic task ack"),
+               Msg:       []byte(responseMsg),
        }, nil
 }
 
 // CollectOneTimeDataProcessor handles one-time task messages
 type CollectOneTimeDataProcessor struct {
-       client *GrpcClient
+       client    *GrpcClient
+       scheduler JobScheduler
 }
 
-func NewCollectOneTimeDataProcessor(client *GrpcClient) 
*CollectOneTimeDataProcessor {
-       return &CollectOneTimeDataProcessor{client: client}
+func NewCollectOneTimeDataProcessor(client *GrpcClient, scheduler 
JobScheduler) *CollectOneTimeDataProcessor {
+       return &CollectOneTimeDataProcessor{
+               client:    client,
+               scheduler: scheduler,
+       }
 }
 
 func (p *CollectOneTimeDataProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
-       // Handle one-time task message
-       // TODO: Implement actual task processing logic
+       log.Printf("Processing one-time task message, identity: %s", 
msg.Identity)
+
+       if p.scheduler == nil {
+               log.Printf("No job scheduler available, cannot process one-time 
task")
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte("no job scheduler available"),
+               }, nil
+       }
+
+       // Parse job from message
+       log.Printf("Received one-time task JSON: %s", string(msg.Msg))
+
+       // Parse JSON to check interval values before unmarshaling
+       var rawJob map[string]interface{}
+       if err := json.Unmarshal(msg.Msg, &rawJob); err == nil {
+               if defaultInterval, ok := rawJob["defaultInterval"]; ok {
+                       log.Printf("DEBUG: Raw defaultInterval from JSON: %v 
(type: %T)", defaultInterval, defaultInterval)
+               }
+               if interval, ok := rawJob["interval"]; ok {
+                       log.Printf("DEBUG: Raw interval from JSON: %v (type: 
%T)", interval, interval)
+               }
+       }
+
+       var job jobtypes.Job
+       if err := json.Unmarshal(msg.Msg, &job); err != nil {
+               log.Printf("Failed to unmarshal job from one-time task message: 
%v", err)
+               log.Printf("JSON content: %s", string(msg.Msg))
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to parse job: 
%v", err)),
+               }, nil
+       }
+
+       // Preprocess passwords once (decrypt encrypted passwords in configmap)
+       // This avoids repeated decryption during each task execution
+       if err := preprocessJobPasswords(&job); err != nil {
+               log.Printf("Failed to preprocess job passwords: %v", err)
+       }
+
+       // Ensure job is marked as non-cyclic
+       job.IsCyclic = false
+
+       log.Printf("Adding one-time job to scheduler: jobID=%d, monitorID=%d, 
app=%s",
+               job.ID, job.MonitorID, job.App)
+
+       // Add job to scheduler
+       if err := p.scheduler.AddAsyncCollectJob(&job); err != nil {
+               log.Printf("Failed to add one-time job to scheduler: %v", err)
+               return &pb.Message{
+                       Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
+                       Direction: pb.Direction_RESPONSE,
+                       Identity:  msg.Identity,
+                       Msg:       []byte(fmt.Sprintf("failed to schedule job: 
%v", err)),
+               }, nil
+       }
+
+       log.Printf("Successfully scheduled one-time job: jobID=%d", job.ID)
        return &pb.Message{
                Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
                Direction: pb.Direction_RESPONSE,
                Identity:  msg.Identity,
-               Msg:       []byte("one-time task ack"),
+               Msg:       []byte("one-time task scheduled successfully"),
        }, nil
 }
 
 // RegisterDefaultProcessors registers all default message processors
-func RegisterDefaultProcessors(client *GrpcClient) {
+func RegisterDefaultProcessors(client *GrpcClient, scheduler JobScheduler) {
        client.RegisterProcessor(MessageTypeHeartbeat, func(msg interface{}) 
(interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
                        processor := &HeartbeatProcessor{}
@@ -221,7 +442,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
 
        client.RegisterProcessor(MessageTypeIssueCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := NewCollectCyclicDataProcessor(client)
+                       processor := NewCollectCyclicDataProcessor(client, 
scheduler)
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -229,7 +450,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
 
        client.RegisterProcessor(MessageTypeDeleteCyclicTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := NewDeleteCyclicTaskProcessor(client)
+                       processor := NewDeleteCyclicTaskProcessor(client, 
scheduler)
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
@@ -237,7 +458,7 @@ func RegisterDefaultProcessors(client *GrpcClient) {
 
        client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       processor := NewCollectOneTimeDataProcessor(client)
+                       processor := NewCollectOneTimeDataProcessor(client, 
scheduler)
                        return processor.Process(pbMsg)
                }
                return nil, fmt.Errorf("invalid message type")
diff --git a/internal/util/arrow/arrow_serializer.go 
b/internal/util/arrow/arrow_serializer.go
new file mode 100644
index 0000000..1915b43
--- /dev/null
+++ b/internal/util/arrow/arrow_serializer.go
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package arrow
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// ArrowSerializer handles Arrow format serialization for metrics data
+type ArrowSerializer struct {
+       logger logger.Logger
+}
+
+// NewArrowSerializer creates a new Arrow serializer
+func NewArrowSerializer(logger logger.Logger) *ArrowSerializer {
+       return &ArrowSerializer{
+               logger: logger.WithName("arrow-serializer"),
+       }
+}
+
+// SerializeMetricsData serializes metrics data to Arrow format
+// For now, we'll use a compatible format that Manager can handle
+func (as *ArrowSerializer) SerializeMetricsData(dataList 
[]*jobtypes.CollectRepMetricsData) ([]byte, error) {
+       if len(dataList) == 0 {
+               return nil, fmt.Errorf("empty data list")
+       }
+
+       // For compatibility with Java Manager, we need to create a format that
+       // ArrowUtil.deserializeMetricsData() can handle
+
+       // First, let's try to create a simple Arrow-compatible format
+       // Since we don't have full Arrow library, we'll create a minimal 
implementation
+
+       // Convert to a format similar to what Java expects
+       arrowCompatibleData := make([]map[string]interface{}, len(dataList))
+
+       for i, data := range dataList {
+               // Create Arrow-like structure
+               arrowData := map[string]interface{}{
+                       "id":       data.ID,
+                       "tenantId": data.TenantID,
+                       "app":      data.App,
+                       "metrics":  data.Metrics,
+                       "priority": data.Priority,
+                       "time":     data.Time,
+                       "code":     data.Code,
+                       "msg":      data.Msg,
+               }
+
+               // Convert Values to Arrow-compatible format
+               if len(data.Values) > 0 {
+                       // Create column-based structure like Arrow
+                       columns := make(map[string][]interface{})
+
+                       for _, valueRow := range data.Values {
+                               for i, value := range valueRow.Columns {
+                                       if i < len(data.Fields) {
+                                               fieldName := 
data.Fields[i].Field
+                                               if columns[fieldName] == nil {
+                                                       columns[fieldName] = 
make([]interface{}, 0, len(data.Values))
+                                               }
+                                               columns[fieldName] = 
append(columns[fieldName], value)
+                                       }
+                               }
+                       }
+
+                       arrowData["columns"] = columns
+                       arrowData["rowCount"] = len(data.Values)
+               }
+
+               arrowCompatibleData[i] = arrowData
+       }
+
+       // For now, serialize as JSON but with Arrow-like structure
+       // TODO: Implement proper Arrow serialization when Arrow library is 
available
+       jsonBytes, err := json.Marshal(arrowCompatibleData)
+       if err != nil {
+               as.logger.Error(err, "failed to serialize arrow-compatible 
data")
+               return nil, fmt.Errorf("failed to serialize arrow-compatible 
data: %w", err)
+       }
+
+       // Create a minimal Arrow-like binary format
+       // This is a temporary solution until we have proper Arrow support
+       return as.createArrowLikeBinary(jsonBytes)
+}
+
+// createArrowLikeBinary creates a binary format that might be compatible with 
Arrow reader
+func (as *ArrowSerializer) createArrowLikeBinary(jsonData []byte) ([]byte, 
error) {
+       // Create a simple binary format that includes:
+       // 1. Magic bytes (Arrow-like)
+       // 2. Schema information
+       // 3. Data
+
+       var buffer bytes.Buffer
+
+       // Write Arrow-like magic bytes
+       // This is a simplified version - real Arrow has specific magic bytes
+       magic := []byte("ARROW1\x00\x00")
+       buffer.Write(magic)
+
+       // Write schema length (placeholder)
+       schemaLen := uint32(0) // No schema for now
+       buffer.Write([]byte{byte(schemaLen), byte(schemaLen >> 8), 
byte(schemaLen >> 16), byte(schemaLen >> 24)})
+
+       // Write data length
+       dataLen := uint32(len(jsonData))
+       buffer.Write([]byte{byte(dataLen), byte(dataLen >> 8), byte(dataLen >> 
16), byte(dataLen >> 24)})
+
+       // Write the JSON data (as a temporary measure)
+       buffer.Write(jsonData)
+
+       as.logger.V(1).Info("created arrow-like binary format",
+               "originalSize", len(jsonData),
+               "binarySize", buffer.Len())
+
+       return buffer.Bytes(), nil
+}
+
+// FallbackToJSON provides JSON serialization as fallback
+func (as *ArrowSerializer) FallbackToJSON(dataList 
[]*jobtypes.CollectRepMetricsData) ([]byte, error) {
+       as.logger.Info("falling back to JSON serialization", "dataCount", 
len(dataList))
+
+       jsonBytes, err := json.Marshal(dataList)
+       if err != nil {
+               return nil, fmt.Errorf("failed to serialize to JSON: %w", err)
+       }
+
+       return jsonBytes, nil
+}
diff --git a/internal/util/crypto/aes_util.go b/internal/util/crypto/aes_util.go
new file mode 100644
index 0000000..678760f
--- /dev/null
+++ b/internal/util/crypto/aes_util.go
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package crypto
+
+import (
+       "crypto/aes"
+       "crypto/cipher"
+       "encoding/base64"
+       "fmt"
+       "os"
+       "sync"
+       "unicode"
+
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// AESUtil provides AES encryption/decryption utilities
+// This matches Java's AesUtil functionality
+type AESUtil struct {
+       secretKey string
+       mutex     sync.RWMutex
+}
+
+var (
+       defaultAESUtil = &AESUtil{}
+       once           sync.Once
+       log            = logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelInfo).WithName("aes-util")
+)
+
+// GetDefaultAESUtil returns the singleton AES utility instance
+func GetDefaultAESUtil() *AESUtil {
+       once.Do(func() {
+               defaultAESUtil = &AESUtil{}
+       })
+       return defaultAESUtil
+}
+
+// SetDefaultSecretKey sets the default AES secret key (matches Java: 
AesUtil.setDefaultSecretKey)
+func SetDefaultSecretKey(secretKey string) {
+       GetDefaultAESUtil().SetSecretKey(secretKey)
+}
+
+// GetDefaultSecretKey gets the current AES secret key (matches Java: 
AesUtil.getDefaultSecretKey)
+func GetDefaultSecretKey() string {
+       return GetDefaultAESUtil().GetSecretKey()
+}
+
+// SetSecretKey sets the AES secret key for this instance
+func (a *AESUtil) SetSecretKey(secretKey string) {
+       a.mutex.Lock()
+       defer a.mutex.Unlock()
+       a.secretKey = secretKey
+       log.Info("AES secret key updated", "keyLength", len(secretKey))
+}
+
+// GetSecretKey gets the AES secret key for this instance
+func (a *AESUtil) GetSecretKey() string {
+       a.mutex.RLock()
+       defer a.mutex.RUnlock()
+       return a.secretKey
+}
+
+// AesDecode decrypts AES encrypted data (matches Java: AesUtil.aesDecode)
+func (a *AESUtil) AesDecode(encryptedData string) (string, error) {
+       secretKey := a.GetSecretKey()
+       if secretKey == "" {
+               return "", fmt.Errorf("AES secret key not set")
+       }
+       return a.AesDecodeWithKey(encryptedData, secretKey)
+}
+
+// AesDecodeWithKey decrypts AES encrypted data with specified key
+func (a *AESUtil) AesDecodeWithKey(encryptedData, key string) (string, error) {
+       // Ensure key is exactly 16 bytes for AES-128 (Java requirement)
+       keyBytes := []byte(key)
+       if len(keyBytes) != 16 {
+               return "", fmt.Errorf("key must be exactly 16 bytes, got %d", 
len(keyBytes))
+       }
+
+       // Decode base64 encoded data
+       encryptedBytes, err := base64.StdEncoding.DecodeString(encryptedData)
+       if err != nil {
+               return "", fmt.Errorf("failed to decode base64: %w", err)
+       }
+
+       // Create AES cipher
+       block, err := aes.NewCipher(keyBytes)
+       if err != nil {
+               return "", fmt.Errorf("failed to create AES cipher: %w", err)
+       }
+
+       // Check if data length is valid for AES
+       if len(encryptedBytes) < aes.BlockSize || 
len(encryptedBytes)%aes.BlockSize != 0 {
+               return "", fmt.Errorf("invalid encrypted data length: %d", 
len(encryptedBytes))
+       }
+
+       // Java uses the key itself as IV (this matches Java's implementation)
+       iv := keyBytes
+
+       // Create CBC decrypter
+       mode := cipher.NewCBCDecrypter(block, iv)
+
+       // Decrypt
+       decrypted := make([]byte, len(encryptedBytes))
+       mode.CryptBlocks(decrypted, encryptedBytes)
+
+       // Remove PKCS5/PKCS7 padding (Go's PKCS7 is compatible with Java's 
PKCS5 for AES)
+       decrypted, err = removePKCS7Padding(decrypted)
+       if err != nil {
+               return "", fmt.Errorf("failed to remove padding: %w", err)
+       }
+
+       return string(decrypted), nil
+}
+
+// IsCiphertext determines whether text is encrypted (matches Java: 
AesUtil.isCiphertext)
+func (a *AESUtil) IsCiphertext(text string) bool {
+       // First check if it's valid base64
+       if _, err := base64.StdEncoding.DecodeString(text); err != nil {
+               return false
+       }
+
+       // Try to decrypt and see if it succeeds
+       _, err := a.AesDecode(text)
+       return err == nil
+}
+
+// removePKCS7Padding removes PKCS7 padding from decrypted data
+func removePKCS7Padding(data []byte) ([]byte, error) {
+       if len(data) == 0 {
+               return data, nil
+       }
+
+       paddingLen := int(data[len(data)-1])
+       if paddingLen > len(data) || paddingLen == 0 {
+               return data, nil // Return as-is if padding seems invalid
+       }
+
+       // Verify padding
+       for i := 0; i < paddingLen; i++ {
+               if data[len(data)-1-i] != byte(paddingLen) {
+                       return data, nil // Return as-is if padding is invalid
+               }
+       }
+
+       return data[:len(data)-paddingLen], nil
+}
+
+// isPrintableString checks if a string contains only printable characters
+func isPrintableString(s string) bool {
+       if len(s) == 0 {
+               return false
+       }
+
+       for _, r := range s {
+               if !unicode.IsPrint(r) && !unicode.IsSpace(r) {
+                       return false
+               }
+       }
+       return true
+}
+
+// Convenience functions that match Java's static methods
+
+// AesDecode decrypts with the default secret key
+func AesDecode(encryptedData string) (string, error) {
+       return GetDefaultAESUtil().AesDecode(encryptedData)
+}
+
+// AesDecodeWithKey decrypts with specified key
+func AesDecodeWithKey(encryptedData, key string) (string, error) {
+       return GetDefaultAESUtil().AesDecodeWithKey(encryptedData, key)
+}
+
+// IsCiphertext checks if text is encrypted using default key
+func IsCiphertext(text string) bool {
+       return GetDefaultAESUtil().IsCiphertext(text)
+}
diff --git a/internal/util/param/param_replacer.go 
b/internal/util/param/param_replacer.go
new file mode 100644
index 0000000..ff6a944
--- /dev/null
+++ b/internal/util/param/param_replacer.go
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package param
+
+import (
+       "encoding/json"
+       "fmt"
+       "os"
+       "strconv"
+       "strings"
+
+       jobtypes 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/job"
+       loggertype 
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/types/logger"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/crypto"
+       "hertzbeat.apache.org/hertzbeat-collector-go/internal/util/logger"
+)
+
+// Replacer is a standalone utility for parameter replacement
+// This matches Java's parameter substitution mechanism where ^_^paramName^_^ 
gets replaced with actual values
+type Replacer struct{}
+
+// NewReplacer creates a new parameter replacer
+func NewReplacer() *Replacer {
+       return &Replacer{}
+}
+
+// PreprocessJobPasswords decrypts passwords in job configmap once during job 
creation
+// This permanently replaces encrypted passwords with decrypted ones in the 
job configmap
+func (r *Replacer) PreprocessJobPasswords(job *jobtypes.Job) error {
+       if job == nil || job.Configmap == nil {
+               return nil
+       }
+
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelDebug).WithName("password-preprocessor")
+
+       // Decrypt passwords in configmap once and permanently replace them
+       for i := range job.Configmap {
+               config := &job.Configmap[i] // Get pointer to modify in place
+               if config.Type == 2 {       // password type
+                       if encryptedValue, ok := config.Value.(string); ok {
+                               log.Sugar().Debugf("preprocessing encrypted 
password for key: %s", config.Key)
+                               if decoded, err := 
r.decryptPassword(encryptedValue); err == nil {
+                                       log.Info("password preprocessing 
successful", "key", config.Key, "length", len(decoded))
+                                       config.Value = decoded // Permanently 
replace encrypted value with decrypted value
+                                       config.Type = 1        // Change type 
to string since it's now decrypted
+                               } else {
+                                       log.Error(err, "password preprocessing 
failed, keeping original value", "key", config.Key)
+                               }
+                       }
+               }
+       }
+       return nil
+}
+
+// ReplaceJobParams replaces all parameter placeholders in job configuration
+func (r *Replacer) ReplaceJobParams(job *jobtypes.Job) (*jobtypes.Job, error) {
+       if job == nil {
+               return nil, fmt.Errorf("job is nil")
+       }
+
+       // Create parameter map from configmap (passwords should already be 
decrypted)
+       paramMap := r.createParamMapSimple(job.Configmap)
+
+       // Clone the job to avoid modifying original
+       clonedJob := job.Clone()
+       if clonedJob == nil {
+               return nil, fmt.Errorf("failed to clone job")
+       }
+
+       // Replace parameters in all metrics
+       for i := range clonedJob.Metrics {
+               if err := r.replaceMetricsParams(&clonedJob.Metrics[i], 
paramMap); err != nil {
+                       return nil, fmt.Errorf("failed to replace params in 
metrics %s: %w", clonedJob.Metrics[i].Name, err)
+               }
+       }
+
+       return clonedJob, nil
+}
+
+// createParamMapSimple creates a parameter map from configmap entries
+// Assumes passwords have already been decrypted by PreprocessJobPasswords
+func (r *Replacer) createParamMapSimple(configmap []jobtypes.Configmap) 
map[string]string {
+       paramMap := make(map[string]string)
+
+       for _, config := range configmap {
+               // Convert value to string, handle null values as empty strings
+               var strValue string
+               if config.Value == nil {
+                       strValue = "" // null values become empty strings
+               } else {
+                       switch config.Type {
+                       case 0: // number
+                               if numVal, ok := config.Value.(float64); ok {
+                                       strValue = strconv.FormatFloat(numVal, 
'f', -1, 64)
+                               } else if intVal, ok := config.Value.(int); ok {
+                                       strValue = strconv.Itoa(intVal)
+                               } else {
+                                       strValue = fmt.Sprintf("%v", 
config.Value)
+                               }
+                       case 1, 2: // string or password (both are now plain 
strings after preprocessing)
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       default:
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       }
+               }
+               paramMap[config.Key] = strValue
+       }
+
+       return paramMap
+}
+
+// createParamMap creates a parameter map from configmap entries (legacy 
version with decryption)
+// Deprecated: Use PreprocessJobPasswords + createParamMapSafe instead
+func (r *Replacer) createParamMap(configmap []jobtypes.Configmap) 
map[string]string {
+       paramMap := make(map[string]string)
+
+       for _, config := range configmap {
+               // Convert value to string based on type, handle null values as 
empty strings
+               var strValue string
+               if config.Value == nil {
+                       strValue = "" // null values become empty strings
+               } else {
+                       switch config.Type {
+                       case 0: // number
+                               if numVal, ok := config.Value.(float64); ok {
+                                       strValue = strconv.FormatFloat(numVal, 
'f', -1, 64)
+                               } else if intVal, ok := config.Value.(int); ok {
+                                       strValue = strconv.Itoa(intVal)
+                               } else {
+                                       strValue = fmt.Sprintf("%v", 
config.Value)
+                               }
+                       case 1: // string
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       case 2: // password (encrypted)
+                               if encryptedValue, ok := config.Value.(string); 
ok {
+                                       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelDebug).WithName("param-replacer")
+                                       log.Sugar().Debugf("attempting to 
decrypt password: %s", encryptedValue)
+                                       if decoded, err := 
r.decryptPassword(encryptedValue); err == nil {
+                                               log.Info("password decryption 
successful", "length", len(decoded))
+                                               strValue = decoded
+                                       } else {
+                                               log.Error(err, "password 
decryption failed, using original value")
+                                               // Fallback to original value 
if decryption fails
+                                               strValue = encryptedValue
+                                       }
+                               } else {
+                                       strValue = fmt.Sprintf("%v", 
config.Value)
+                               }
+                       default:
+                               strValue = fmt.Sprintf("%v", config.Value)
+                       }
+               }
+               paramMap[config.Key] = strValue
+       }
+
+       return paramMap
+}
+
+// replaceMetricsParams replaces parameters in metrics configuration
+func (r *Replacer) replaceMetricsParams(metrics *jobtypes.Metrics, paramMap 
map[string]string) error {
+       // Replace parameters in protocol-specific configurations
+       // Currently only JDBC is defined as interface{} in the struct 
definition
+       // Other protocols are concrete struct pointers and don't contain 
placeholders from JSON
+       if metrics.JDBC != nil {
+               if err := r.replaceProtocolParams(&metrics.JDBC, paramMap); err 
!= nil {
+                       return fmt.Errorf("failed to replace JDBC params: %w", 
err)
+               }
+       }
+
+       // TODO: Add other protocol configurations as they are converted to 
interface{} types
+       // For now, other protocols (HTTP, SSH, etc.) are concrete struct 
pointers and
+       // don't require parameter replacement
+
+       // Replace parameters in basic metrics fields
+       if err := r.replaceBasicMetricsParams(metrics, paramMap); err != nil {
+               return fmt.Errorf("failed to replace basic metrics params: %w", 
err)
+       }
+
+       return nil
+}
+
+// replaceProtocolParams replaces parameters in any protocol configuration
+func (r *Replacer) replaceProtocolParams(protocolInterface *interface{}, 
paramMap map[string]string) error {
+       if *protocolInterface == nil {
+               return nil
+       }
+
+       // Convert protocol interface{} to map for manipulation
+       protocolMap, ok := (*protocolInterface).(map[string]interface{})
+       if !ok {
+               // If it's already processed or not a map, skip
+               return nil
+       }
+
+       // Recursively replace parameters in all string values
+       return r.replaceParamsInMap(protocolMap, paramMap)
+}
+
+// replaceParamsInMap recursively replaces parameters in a map structure
+func (r *Replacer) replaceParamsInMap(data map[string]interface{}, paramMap 
map[string]string) error {
+       for key, value := range data {
+               switch v := value.(type) {
+               case string:
+                       // Replace parameters in string values
+                       data[key] = r.replaceParamPlaceholders(v, paramMap)
+               case map[string]interface{}:
+                       // Recursively handle nested maps
+                       if err := r.replaceParamsInMap(v, paramMap); err != nil 
{
+                               return fmt.Errorf("failed to replace params in 
nested map %s: %w", key, err)
+                       }
+               case []interface{}:
+                       // Handle arrays
+                       for i, item := range v {
+                               if itemMap, ok := 
item.(map[string]interface{}); ok {
+                                       if err := r.replaceParamsInMap(itemMap, 
paramMap); err != nil {
+                                               return fmt.Errorf("failed to 
replace params in array item %d: %w", i, err)
+                                       }
+                               } else if itemStr, ok := item.(string); ok {
+                                       v[i] = 
r.replaceParamPlaceholders(itemStr, paramMap)
+                               }
+                       }
+               }
+       }
+       return nil
+}
+
+// replaceBasicMetricsParams replaces parameters in basic metrics fields
+func (r *Replacer) replaceBasicMetricsParams(metrics *jobtypes.Metrics, 
paramMap map[string]string) error {
+       // Replace parameters in basic string fields
+       metrics.Host = r.replaceParamPlaceholders(metrics.Host, paramMap)
+       metrics.Port = r.replaceParamPlaceholders(metrics.Port, paramMap)
+       metrics.Timeout = r.replaceParamPlaceholders(metrics.Timeout, paramMap)
+       metrics.Range = r.replaceParamPlaceholders(metrics.Range, paramMap)
+
+       // Replace parameters in ConfigMap
+       if metrics.ConfigMap != nil {
+               for key, value := range metrics.ConfigMap {
+                       metrics.ConfigMap[key] = 
r.replaceParamPlaceholders(value, paramMap)
+               }
+       }
+
+       return nil
+}
+
+// replaceParamPlaceholders replaces ^_^paramName^_^ placeholders with actual 
values
+func (r *Replacer) replaceParamPlaceholders(template string, paramMap 
map[string]string) string {
+       // Guard against empty templates to prevent strings.ReplaceAll issues
+       if template == "" {
+               return ""
+       }
+
+       result := template
+
+       // Find all ^_^paramName^_^ patterns and replace them
+       for paramName, paramValue := range paramMap {
+               // Guard against empty parameter names
+               if paramName == "" {
+                       continue
+               }
+               placeholder := fmt.Sprintf("^_^%s^_^", paramName)
+               result = strings.ReplaceAll(result, placeholder, paramValue)
+       }
+
+       return result
+}
+
+// ExtractProtocolConfig extracts and processes protocol configuration after 
parameter replacement
+func (r *Replacer) ExtractProtocolConfig(protocolInterface interface{}, 
targetStruct interface{}) error {
+       if protocolInterface == nil {
+               return fmt.Errorf("protocol interface is nil")
+       }
+
+       // If it's a map (from JSON parsing), convert to target struct
+       if protocolMap, ok := protocolInterface.(map[string]interface{}); ok {
+               // Convert map to JSON and then unmarshal to target struct
+               jsonData, err := json.Marshal(protocolMap)
+               if err != nil {
+                       return fmt.Errorf("failed to marshal protocol config: 
%w", err)
+               }
+
+               if err := json.Unmarshal(jsonData, targetStruct); err != nil {
+                       return fmt.Errorf("failed to unmarshal protocol config: 
%w", err)
+               }
+
+               return nil
+       }
+
+       return fmt.Errorf("unsupported protocol config type: %T", 
protocolInterface)
+}
+
+// ExtractJDBCConfig extracts and processes JDBC configuration after parameter 
replacement
+func (r *Replacer) ExtractJDBCConfig(jdbcInterface interface{}) 
(*jobtypes.JDBCProtocol, error) {
+       if jdbcInterface == nil {
+               return nil, nil
+       }
+
+       // If it's already a JDBCProtocol struct
+       if jdbcConfig, ok := jdbcInterface.(*jobtypes.JDBCProtocol); ok {
+               return jdbcConfig, nil
+       }
+
+       // Use the generic extraction method
+       var jdbcConfig jobtypes.JDBCProtocol
+       if err := r.ExtractProtocolConfig(jdbcInterface, &jdbcConfig); err != 
nil {
+               return nil, fmt.Errorf("failed to extract JDBC config: %w", err)
+       }
+
+       return &jdbcConfig, nil
+}
+
+// ExtractHTTPConfig extracts and processes HTTP configuration after parameter 
replacement
+func (r *Replacer) ExtractHTTPConfig(httpInterface interface{}) 
(*jobtypes.HTTPProtocol, error) {
+       if httpInterface == nil {
+               return nil, nil
+       }
+
+       // If it's already an HTTPProtocol struct
+       if httpConfig, ok := httpInterface.(*jobtypes.HTTPProtocol); ok {
+               return httpConfig, nil
+       }
+
+       // Use the generic extraction method
+       var httpConfig jobtypes.HTTPProtocol
+       if err := r.ExtractProtocolConfig(httpInterface, &httpConfig); err != 
nil {
+               return nil, fmt.Errorf("failed to extract HTTP config: %w", err)
+       }
+
+       return &httpConfig, nil
+}
+
+// ExtractSSHConfig extracts and processes SSH configuration after parameter 
replacement
+func (r *Replacer) ExtractSSHConfig(sshInterface interface{}) 
(*jobtypes.SSHProtocol, error) {
+       if sshInterface == nil {
+               return nil, nil
+       }
+
+       // If it's already an SSHProtocol struct
+       if sshConfig, ok := sshInterface.(*jobtypes.SSHProtocol); ok {
+               return sshConfig, nil
+       }
+
+       // Use the generic extraction method
+       var sshConfig jobtypes.SSHProtocol
+       if err := r.ExtractProtocolConfig(sshInterface, &sshConfig); err != nil 
{
+               return nil, fmt.Errorf("failed to extract SSH config: %w", err)
+       }
+
+       return &sshConfig, nil
+}
+
+// decryptPassword decrypts an encrypted password using AES
+// This implements the same algorithm as Java manager's AESUtil
+func (r *Replacer) decryptPassword(encryptedPassword string) (string, error) {
+       log := logger.DefaultLogger(os.Stdout, 
loggertype.LogLevelDebug).WithName("password-decrypt")
+
+       // Use the AES utility (matches Java: AesUtil.aesDecode)
+       if result, err := crypto.AesDecode(encryptedPassword); err == nil {
+               log.Info("password decrypted successfully", "length", 
len(result))
+               return result, nil
+       } else {
+               log.Sugar().Debugf("primary decryption failed: %v", err)
+       }
+
+       // Fallback: try with default Java key if no manager key is set
+       defaultKey := "tomSun28HaHaHaHa"
+       if result, err := crypto.AesDecodeWithKey(encryptedPassword, 
defaultKey); err == nil {
+               log.Info("password decrypted with default key", "length", 
len(result))
+               return result, nil
+       }
+
+       // If all decryption attempts fail, return original value to allow 
system to continue
+       log.Info("all decryption attempts failed, using original value")
+       return encryptedPassword, nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to