Copilot commented on code in PR #777:
URL: https://github.com/apache/skywalking-java/pull/777#discussion_r2501318194


##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -57,6 +58,8 @@ public class GRPCChannelManager implements BootService, 
Runnable {
     private volatile List<String> grpcServers;
     private volatile int selectedIdx = -1;
     private volatile int reconnectCount = 0;
+    private volatile int transientFailureCount = 0;

Review Comment:
   The `transientFailureCount` variable is modified in 
`checkChannelStateAndTriggerReconnectIfNeeded()` (line 265, 274) and read in 
the `run()` method (line 158) without synchronization. This can lead to race 
conditions since `checkChannelStateAndTriggerReconnectIfNeeded()` is called 
early in `run()` (line 112) before the reconnect block that reads this value. 
Consider protecting reads and writes to this variable with the `statusLock` or 
using `AtomicInteger`.



##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -130,32 +141,33 @@ public void run() {
                 String server = "";
                 try {
                     int index = Math.abs(random.nextInt()) % 
grpcServers.size();
+
+                    server = grpcServers.get(index);
+                    String[] ipAndPort = server.split(":");
+
                     if (index != selectedIdx) {
                         selectedIdx = index;
+                        LOGGER.debug("Connecting to different gRPC server {}. 
Shutting down existing channel if any.", server);
+                        createNewChannel(ipAndPort[0], 
Integer.parseInt(ipAndPort[1]));
+                    } else {
+                        // Same server, increment reconnectCount and check 
state
+                        reconnectCount++;

Review Comment:
   The `reconnectCount` variable is incremented here without synchronization 
(line 154), but is also reset in `markAsConnected()` which uses `statusLock` 
(line 248). This creates inconsistent synchronization. The increment should 
also be protected by `statusLock` to prevent race conditions between the 
increment and reset operations.
   ```suggestion
                           synchronized (statusLock) {
                               reconnectCount++;
                           }
   ```



##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
      */
     public void reportError(Throwable throwable) {
         if (isNetworkError(throwable)) {
+            triggerReconnect();
+        }
+    }
+
+    private void notify(GRPCChannelStatus status) {
+        synchronized (listeners) {
+            for (GRPCChannelListener listener : listeners) {
+                try {
+                    listener.statusChanged(status);
+                } catch (Throwable t) {
+                    LOGGER.error(t, "Fail to notify {} about channel 
connected.", listener.getClass().getName());
+                }
+            }
+        }
+    }
+
+    /**
+     * Create a new gRPC channel to the specified server and reset connection 
state.
+     */
+    private void createNewChannel(String host, int port) throws Exception {
+        if (managedChannel != null) {
+            managedChannel.shutdownNow();
+        }
+
+        managedChannel = GRPCChannel.newBuilder(host, port)
+                                    .addManagedChannelBuilder(new 
StandardChannelBuilder())
+                                    .addManagedChannelBuilder(new 
TLSChannelBuilder())
+                                    .addChannelDecorator(new 
AgentIDDecorator())
+                                    .addChannelDecorator(new 
AuthenticationDecorator())
+                                    .build();
+
+        markAsConnected();
+    }
+
+    /**
+     * Trigger reconnection by setting reconnect flag and notifying listeners.
+     */
+    private void triggerReconnect() {
+        synchronized (statusLock) {
             reconnect = true;
             notify(GRPCChannelStatus.DISCONNECT);
         }
     }
 
-    private void notify(GRPCChannelStatus status) {
-        for (GRPCChannelListener listener : listeners) {
+    /**
+     * Mark connection as successful and reset connection state.
+     */
+    private void markAsConnected() {
+        synchronized (statusLock) {
+            reconnectCount = 0;
+            reconnect = false;
+            notify(GRPCChannelStatus.CONNECTED);
+        }
+    }
+
+    /**
+     * Check the connectivity state of existing channel and trigger reconnect 
if needed.
+     * This method monitors TRANSIENT_FAILURE state and triggers reconnect if 
the failure persists too long.
+     */
+    private void checkChannelStateAndTriggerReconnectIfNeeded() {
+        if (managedChannel != null) {
             try {
-                listener.statusChanged(status);
+                ConnectivityState state = managedChannel.getState(false);
+                LOGGER.debug("Current channel state: {}", state);
+
+                if (state == ConnectivityState.TRANSIENT_FAILURE) {
+                    transientFailureCount++;
+                    LOGGER.warn("Channel in TRANSIENT_FAILURE state, count: 
{}", transientFailureCount);
+                } else if (state == ConnectivityState.SHUTDOWN) {
+                    LOGGER.warn("Channel is SHUTDOWN");
+                    if (!reconnect) {
+                        triggerReconnect();
+                    }
+                } else {
+                    // IDLE, READY, CONNECTING are all normal states
+                    transientFailureCount = 0;
+                }

Review Comment:
   The `checkChannelStateAndTriggerReconnectIfNeeded()` method increments 
`transientFailureCount` when detecting TRANSIENT_FAILURE state, but never 
actually triggers a reconnect based on this count. The force reconnect logic at 
line 157-158 checks this count, but that code only executes when `reconnect` is 
already true. This means a channel stuck in TRANSIENT_FAILURE when `reconnect` 
is false will never trigger a reconnect. Consider triggering reconnect in this 
method when `transientFailureCount` exceeds the threshold.



##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java:
##########
@@ -210,6 +210,20 @@ public static class Collector {
          * How long grpc client will timeout in sending data to upstream.
          */
         public static int GRPC_UPSTREAM_TIMEOUT = 30;
+        /**
+         * The interval in seconds to send a keepalive ping to the backend.
+         * If this is less than or equal to 0, the keepalive is disabled.
+         *

Review Comment:
   [nitpick] The `GRPC_KEEPALIVE_TIME` field is declared as `long` but the 
configuration comment in agent.config (line 105) uses it with TimeUnit.SECONDS 
in GRPCChannel.java (line 45). The configuration validation at line 44 in 
GRPCChannel checks if `> 0`, but according to gRPC documentation, keepalive 
time values below a certain threshold (typically 10 seconds) may be rejected by 
the server. Consider adding a comment documenting the minimum safe value or 
adding validation.
   ```suggestion
            * <p>
            * <b>Note:</b> The minimum safe value is 10 seconds. Values below 
this may be rejected by the gRPC server.
   ```



##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
      */
     public void reportError(Throwable throwable) {
         if (isNetworkError(throwable)) {
+            triggerReconnect();
+        }
+    }
+
+    private void notify(GRPCChannelStatus status) {
+        synchronized (listeners) {
+            for (GRPCChannelListener listener : listeners) {
+                try {
+                    listener.statusChanged(status);
+                } catch (Throwable t) {
+                    LOGGER.error(t, "Fail to notify {} about channel 
connected.", listener.getClass().getName());
+                }
+            }
+        }
+    }
+
+    /**
+     * Create a new gRPC channel to the specified server and reset connection 
state.
+     */
+    private void createNewChannel(String host, int port) throws Exception {
+        if (managedChannel != null) {
+            managedChannel.shutdownNow();
+        }
+
+        managedChannel = GRPCChannel.newBuilder(host, port)
+                                    .addManagedChannelBuilder(new 
StandardChannelBuilder())
+                                    .addManagedChannelBuilder(new 
TLSChannelBuilder())
+                                    .addChannelDecorator(new 
AgentIDDecorator())
+                                    .addChannelDecorator(new 
AuthenticationDecorator())
+                                    .build();
+
+        markAsConnected();
+    }
+
+    /**
+     * Trigger reconnection by setting reconnect flag and notifying listeners.
+     */
+    private void triggerReconnect() {
+        synchronized (statusLock) {
             reconnect = true;
             notify(GRPCChannelStatus.DISCONNECT);
         }
     }
 
-    private void notify(GRPCChannelStatus status) {
-        for (GRPCChannelListener listener : listeners) {
+    /**
+     * Mark connection as successful and reset connection state.
+     */
+    private void markAsConnected() {
+        synchronized (statusLock) {
+            reconnectCount = 0;
+            reconnect = false;
+            notify(GRPCChannelStatus.CONNECTED);
+        }
+    }

Review Comment:
   The `markAsConnected()` method resets both `reconnectCount` and 
`transientFailureCount` should be reset here as well to 0, since a successful 
connection means transient failures are resolved. Currently 
`transientFailureCount` is only reset in 
`checkChannelStateAndTriggerReconnectIfNeeded()` when the state is not 
TRANSIENT_FAILURE, but it should also be reset when marking as connected to 
ensure consistency.



##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
      */
     public void reportError(Throwable throwable) {
         if (isNetworkError(throwable)) {
+            triggerReconnect();
+        }
+    }
+
+    private void notify(GRPCChannelStatus status) {
+        synchronized (listeners) {
+            for (GRPCChannelListener listener : listeners) {
+                try {
+                    listener.statusChanged(status);
+                } catch (Throwable t) {
+                    LOGGER.error(t, "Fail to notify {} about channel 
connected.", listener.getClass().getName());
+                }
+            }
+        }
+    }
+
+    /**
+     * Create a new gRPC channel to the specified server and reset connection 
state.
+     */
+    private void createNewChannel(String host, int port) throws Exception {
+        if (managedChannel != null) {
+            managedChannel.shutdownNow();
+        }
+
+        managedChannel = GRPCChannel.newBuilder(host, port)
+                                    .addManagedChannelBuilder(new 
StandardChannelBuilder())
+                                    .addManagedChannelBuilder(new 
TLSChannelBuilder())
+                                    .addChannelDecorator(new 
AgentIDDecorator())
+                                    .addChannelDecorator(new 
AuthenticationDecorator())
+                                    .build();
+
+        markAsConnected();

Review Comment:
   The `createNewChannel()` method calls `markAsConnected()` immediately after 
creating a new channel, but this is incorrect. A newly created channel is not 
necessarily in a CONNECTED state - it may still be connecting. This can lead to 
false CONNECTED notifications to listeners. Consider either checking the actual 
channel state before calling `markAsConnected()`, or only reset the reconnect 
flags without notifying CONNECTED status until the channel is truly ready.
   ```suggestion
           // Do not call markAsConnected() here; the channel may not be 
connected yet.
           synchronized (statusLock) {
               reconnectCount = 0;
               reconnect = false;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to