Copilot commented on code in PR #777:
URL: https://github.com/apache/skywalking-java/pull/777#discussion_r2501318194
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -57,6 +58,8 @@ public class GRPCChannelManager implements BootService,
Runnable {
private volatile List<String> grpcServers;
private volatile int selectedIdx = -1;
private volatile int reconnectCount = 0;
+ private volatile int transientFailureCount = 0;
Review Comment:
The `transientFailureCount` variable is modified in
`checkChannelStateAndTriggerReconnectIfNeeded()` (line 265, 274) and read in
the `run()` method (line 158) without synchronization. This can lead to race
conditions since `checkChannelStateAndTriggerReconnectIfNeeded()` is called
early in `run()` (line 112) before the reconnect block that reads this value.
Consider protecting reads and writes to this variable with the `statusLock` or
using `AtomicInteger`.
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -130,32 +141,33 @@ public void run() {
String server = "";
try {
int index = Math.abs(random.nextInt()) %
grpcServers.size();
+
+ server = grpcServers.get(index);
+ String[] ipAndPort = server.split(":");
+
if (index != selectedIdx) {
selectedIdx = index;
+ LOGGER.debug("Connecting to different gRPC server {}.
Shutting down existing channel if any.", server);
+ createNewChannel(ipAndPort[0],
Integer.parseInt(ipAndPort[1]));
+ } else {
+ // Same server, increment reconnectCount and check
state
+ reconnectCount++;
Review Comment:
The `reconnectCount` variable is incremented here without synchronization
(line 154), but is also reset in `markAsConnected()` which uses `statusLock`
(line 248). This creates inconsistent synchronization. The increment should
also be protected by `statusLock` to prevent race conditions between the
increment and reset operations.
```suggestion
synchronized (statusLock) {
reconnectCount++;
}
```
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
+ triggerReconnect();
+ }
+ }
+
+ private void notify(GRPCChannelStatus status) {
+ synchronized (listeners) {
+ for (GRPCChannelListener listener : listeners) {
+ try {
+ listener.statusChanged(status);
+ } catch (Throwable t) {
+ LOGGER.error(t, "Fail to notify {} about channel
connected.", listener.getClass().getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a new gRPC channel to the specified server and reset connection
state.
+ */
+ private void createNewChannel(String host, int port) throws Exception {
+ if (managedChannel != null) {
+ managedChannel.shutdownNow();
+ }
+
+ managedChannel = GRPCChannel.newBuilder(host, port)
+ .addManagedChannelBuilder(new
StandardChannelBuilder())
+ .addManagedChannelBuilder(new
TLSChannelBuilder())
+ .addChannelDecorator(new
AgentIDDecorator())
+ .addChannelDecorator(new
AuthenticationDecorator())
+ .build();
+
+ markAsConnected();
+ }
+
+ /**
+ * Trigger reconnection by setting reconnect flag and notifying listeners.
+ */
+ private void triggerReconnect() {
+ synchronized (statusLock) {
reconnect = true;
notify(GRPCChannelStatus.DISCONNECT);
}
}
- private void notify(GRPCChannelStatus status) {
- for (GRPCChannelListener listener : listeners) {
+ /**
+ * Mark connection as successful and reset connection state.
+ */
+ private void markAsConnected() {
+ synchronized (statusLock) {
+ reconnectCount = 0;
+ reconnect = false;
+ notify(GRPCChannelStatus.CONNECTED);
+ }
+ }
+
+ /**
+ * Check the connectivity state of existing channel and trigger reconnect
if needed.
+ * This method monitors TRANSIENT_FAILURE state and triggers reconnect if
the failure persists too long.
+ */
+ private void checkChannelStateAndTriggerReconnectIfNeeded() {
+ if (managedChannel != null) {
try {
- listener.statusChanged(status);
+ ConnectivityState state = managedChannel.getState(false);
+ LOGGER.debug("Current channel state: {}", state);
+
+ if (state == ConnectivityState.TRANSIENT_FAILURE) {
+ transientFailureCount++;
+ LOGGER.warn("Channel in TRANSIENT_FAILURE state, count:
{}", transientFailureCount);
+ } else if (state == ConnectivityState.SHUTDOWN) {
+ LOGGER.warn("Channel is SHUTDOWN");
+ if (!reconnect) {
+ triggerReconnect();
+ }
+ } else {
+ // IDLE, READY, CONNECTING are all normal states
+ transientFailureCount = 0;
+ }
Review Comment:
The `checkChannelStateAndTriggerReconnectIfNeeded()` method increments
`transientFailureCount` when detecting TRANSIENT_FAILURE state, but never
actually triggers a reconnect based on this count. The force reconnect logic at
line 157-158 checks this count, but that code only executes when `reconnect` is
already true. This means a channel stuck in TRANSIENT_FAILURE when `reconnect`
is false will never trigger a reconnect. Consider triggering reconnect in this
method when `transientFailureCount` exceeds the threshold.
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java:
##########
@@ -210,6 +210,20 @@ public static class Collector {
* How long grpc client will timeout in sending data to upstream.
*/
public static int GRPC_UPSTREAM_TIMEOUT = 30;
+ /**
+ * The interval in seconds to send a keepalive ping to the backend.
+ * If this is less than or equal to 0, the keepalive is disabled.
+ *
Review Comment:
[nitpick] The `GRPC_KEEPALIVE_TIME` field is declared as `long` but the
configuration comment in agent.config (line 105) uses it with TimeUnit.SECONDS
in GRPCChannel.java (line 45). The configuration validation at line 44 in
GRPCChannel checks if `> 0`, but according to gRPC documentation, keepalive
time values below a certain threshold (typically 10 seconds) may be rejected by
the server. Consider adding a comment documenting the minimum safe value or
adding validation.
```suggestion
* <p>
* <b>Note:</b> The minimum safe value is 10 seconds. Values below
this may be rejected by the gRPC server.
```
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
+ triggerReconnect();
+ }
+ }
+
+ private void notify(GRPCChannelStatus status) {
+ synchronized (listeners) {
+ for (GRPCChannelListener listener : listeners) {
+ try {
+ listener.statusChanged(status);
+ } catch (Throwable t) {
+ LOGGER.error(t, "Fail to notify {} about channel
connected.", listener.getClass().getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a new gRPC channel to the specified server and reset connection
state.
+ */
+ private void createNewChannel(String host, int port) throws Exception {
+ if (managedChannel != null) {
+ managedChannel.shutdownNow();
+ }
+
+ managedChannel = GRPCChannel.newBuilder(host, port)
+ .addManagedChannelBuilder(new
StandardChannelBuilder())
+ .addManagedChannelBuilder(new
TLSChannelBuilder())
+ .addChannelDecorator(new
AgentIDDecorator())
+ .addChannelDecorator(new
AuthenticationDecorator())
+ .build();
+
+ markAsConnected();
+ }
+
+ /**
+ * Trigger reconnection by setting reconnect flag and notifying listeners.
+ */
+ private void triggerReconnect() {
+ synchronized (statusLock) {
reconnect = true;
notify(GRPCChannelStatus.DISCONNECT);
}
}
- private void notify(GRPCChannelStatus status) {
- for (GRPCChannelListener listener : listeners) {
+ /**
+ * Mark connection as successful and reset connection state.
+ */
+ private void markAsConnected() {
+ synchronized (statusLock) {
+ reconnectCount = 0;
+ reconnect = false;
+ notify(GRPCChannelStatus.CONNECTED);
+ }
+ }
Review Comment:
The `markAsConnected()` method resets both `reconnectCount` and
`transientFailureCount` should be reset here as well to 0, since a successful
connection means transient failures are resolved. Currently
`transientFailureCount` is only reset in
`checkChannelStateAndTriggerReconnectIfNeeded()` when the state is not
TRANSIENT_FAILURE, but it should also be reset when marking as connected to
ensure consistency.
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
+ triggerReconnect();
+ }
+ }
+
+ private void notify(GRPCChannelStatus status) {
+ synchronized (listeners) {
+ for (GRPCChannelListener listener : listeners) {
+ try {
+ listener.statusChanged(status);
+ } catch (Throwable t) {
+ LOGGER.error(t, "Fail to notify {} about channel
connected.", listener.getClass().getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a new gRPC channel to the specified server and reset connection
state.
+ */
+ private void createNewChannel(String host, int port) throws Exception {
+ if (managedChannel != null) {
+ managedChannel.shutdownNow();
+ }
+
+ managedChannel = GRPCChannel.newBuilder(host, port)
+ .addManagedChannelBuilder(new
StandardChannelBuilder())
+ .addManagedChannelBuilder(new
TLSChannelBuilder())
+ .addChannelDecorator(new
AgentIDDecorator())
+ .addChannelDecorator(new
AuthenticationDecorator())
+ .build();
+
+ markAsConnected();
Review Comment:
The `createNewChannel()` method calls `markAsConnected()` immediately after
creating a new channel, but this is incorrect. A newly created channel is not
necessarily in a CONNECTED state - it may still be connecting. This can lead to
false CONNECTED notifications to listeners. Consider either checking the actual
channel state before calling `markAsConnected()`, or only reset the reconnect
flags without notifying CONNECTED status until the channel is truly ready.
```suggestion
// Do not call markAsConnected() here; the channel may not be
connected yet.
synchronized (statusLock) {
reconnectCount = 0;
reconnect = false;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]