This is an automated email from the ASF dual-hosted git repository.

liuhongyu pushed a commit to branch chore/message
in repository https://gitbox.apache.org/repos/asf/hertzbeat-collector-go.git

commit f426cd0fd3acdf716e0557622e224562c6323d57
Author: liuhy <[email protected]>
AuthorDate: Wed Sep 24 19:31:43 2025 +0800

    feat(grpc): implement logging for one-time task message processing
---
 internal/transport/netty_client.go | 37 +++++++++++++++++++++++++++++--------
 internal/transport/processors.go   | 30 +++++++++++++++++++++++-------
 2 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/internal/transport/netty_client.go 
b/internal/transport/netty_client.go
index 25bf51e..be464e6 100644
--- a/internal/transport/netty_client.go
+++ b/internal/transport/netty_client.go
@@ -570,14 +570,35 @@ func RegisterDefaultNettyProcessors(client *NettyClient) {
 
        client.RegisterProcessor(MessageTypeIssueOneTimeTask, func(msg 
interface{}) (interface{}, error) {
                if pbMsg, ok := msg.(*pb.Message); ok {
-                       // Handle one-time task message
-                       // TODO: Implement actual task processing logic
-                       return &pb.Message{
-                               Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
-                               Direction: pb.Direction_RESPONSE,
-                               Identity:  pbMsg.Identity,
-                               Msg:       []byte("one-time task ack"),
-                       }, nil
+                       // Use fmt.Printf to ensure the log is visible
+                       fmt.Printf("=== ONE-TIME TASK RECEIVED ===\n")
+                       fmt.Printf("Message Type: %d (ISSUE_ONE_TIME_TASK)\n", 
pbMsg.Type)
+                       fmt.Printf("Message Content: %s\n", string(pbMsg.Msg))
+                       fmt.Printf("Message Identity: %s\n", pbMsg.Identity)
+                       fmt.Printf("==============================\n")
+
+                       // TODO: Parse Job from pbMsg.Msg and execute the task
+                       // For now, just log the message content and return nil 
(no immediate response)
+                       // The actual response should be sent as 
RESPONSE_ONE_TIME_TASK_DATA (type 7)
+                       // after the task is completed
+
+                       // Simulate task processing
+                       go func() {
+                               fmt.Printf("=== PROCESSING ONE-TIME TASK ===\n")
+                               // TODO: Implement actual task execution here
+                               // This should:
+                               // 1. Parse the Job from pbMsg.Msg
+                               // 2. Execute the collection task
+                               // 3. Send RESPONSE_ONE_TIME_TASK_DATA message 
with results
+
+                               // For now, just log that we would process the 
task
+                               fmt.Printf("One-time task processing completed 
(simulated)\n")
+                               
fmt.Printf("==========================================\n")
+                       }()
+
+                       // Return nil - no immediate response needed
+                       // The actual response will be sent asynchronously
+                       return nil, nil
                }
                return nil, fmt.Errorf("invalid message type")
        })
diff --git a/internal/transport/processors.go b/internal/transport/processors.go
index 8bd71e3..d2f0abf 100644
--- a/internal/transport/processors.go
+++ b/internal/transport/processors.go
@@ -176,13 +176,29 @@ func NewCollectOneTimeDataProcessor(client *GrpcClient) 
*CollectOneTimeDataProce
 
 func (p *CollectOneTimeDataProcessor) Process(msg *pb.Message) (*pb.Message, 
error) {
        // Handle one-time task message
-       // TODO: Implement actual task processing logic
-       return &pb.Message{
-               Type:      pb.MessageType_ISSUE_ONE_TIME_TASK,
-               Direction: pb.Direction_RESPONSE,
-               Identity:  msg.Identity,
-               Msg:       []byte("one-time task ack"),
-       }, nil
+       log.Printf("Received one-time task message: %s", string(msg.Msg))
+
+       // TODO: Parse Job from msg.Msg and execute the task
+       // For now, just log the message content and return nil (no immediate 
response)
+       // The actual response should be sent as RESPONSE_ONE_TIME_TASK_DATA 
(type 7)
+       // after the task is completed
+
+       // Simulate task processing
+       go func() {
+               log.Printf("Processing one-time task...")
+               // TODO: Implement actual task execution here
+               // This should:
+               // 1. Parse the Job from msg.Msg
+               // 2. Execute the collection task
+               // 3. Send RESPONSE_ONE_TIME_TASK_DATA message with results
+
+               // For now, just log that we would process the task
+               log.Printf("One-time task processing completed (simulated)")
+       }()
+
+       // Return nil - no immediate response needed
+       // The actual response will be sent asynchronously
+       return nil, nil
 }
 
 // RegisterDefaultProcessors registers all default message processors


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to