hzhaop commented on code in PR #777:
URL: https://github.com/apache/skywalking-java/pull/777#discussion_r2509298282
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
+ triggerReconnect();
+ }
+ }
+
+ private void notify(GRPCChannelStatus status) {
+ synchronized (listeners) {
+ for (GRPCChannelListener listener : listeners) {
+ try {
+ listener.statusChanged(status);
+ } catch (Throwable t) {
+ LOGGER.error(t, "Fail to notify {} about channel
connected.", listener.getClass().getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a new gRPC channel to the specified server and reset connection
state.
+ */
+ private void createNewChannel(String host, int port) throws Exception {
+ if (managedChannel != null) {
+ managedChannel.shutdownNow();
+ }
+
+ managedChannel = GRPCChannel.newBuilder(host, port)
+ .addManagedChannelBuilder(new
StandardChannelBuilder())
+ .addManagedChannelBuilder(new
TLSChannelBuilder())
+ .addChannelDecorator(new
AgentIDDecorator())
+ .addChannelDecorator(new
AuthenticationDecorator())
+ .build();
+
+ markAsConnected();
+ }
+
+ /**
+ * Trigger reconnection by setting reconnect flag and notifying listeners.
+ */
+ private void triggerReconnect() {
+ synchronized (statusLock) {
reconnect = true;
notify(GRPCChannelStatus.DISCONNECT);
}
}
- private void notify(GRPCChannelStatus status) {
- for (GRPCChannelListener listener : listeners) {
+ /**
+ * Mark connection as successful and reset connection state.
+ */
+ private void markAsConnected() {
+ synchronized (statusLock) {
+ reconnectCount = 0;
+ reconnect = false;
+ notify(GRPCChannelStatus.CONNECTED);
+ }
+ }
Review Comment:
This variable has been removed in the latest commit as we simplified the
reconnection logic per Wu-sheng's feedback.
##########
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java:
##########
@@ -184,17 +196,85 @@ public Channel getChannel() {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
+ triggerReconnect();
+ }
+ }
+
+ private void notify(GRPCChannelStatus status) {
+ synchronized (listeners) {
+ for (GRPCChannelListener listener : listeners) {
+ try {
+ listener.statusChanged(status);
+ } catch (Throwable t) {
+ LOGGER.error(t, "Fail to notify {} about channel
connected.", listener.getClass().getName());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a new gRPC channel to the specified server and reset connection
state.
+ */
+ private void createNewChannel(String host, int port) throws Exception {
+ if (managedChannel != null) {
+ managedChannel.shutdownNow();
+ }
+
+ managedChannel = GRPCChannel.newBuilder(host, port)
+ .addManagedChannelBuilder(new
StandardChannelBuilder())
+ .addManagedChannelBuilder(new
TLSChannelBuilder())
+ .addChannelDecorator(new
AgentIDDecorator())
+ .addChannelDecorator(new
AuthenticationDecorator())
+ .build();
+
+ markAsConnected();
+ }
+
+ /**
+ * Trigger reconnection by setting reconnect flag and notifying listeners.
+ */
+ private void triggerReconnect() {
+ synchronized (statusLock) {
reconnect = true;
notify(GRPCChannelStatus.DISCONNECT);
}
}
- private void notify(GRPCChannelStatus status) {
- for (GRPCChannelListener listener : listeners) {
+ /**
+ * Mark connection as successful and reset connection state.
+ */
+ private void markAsConnected() {
+ synchronized (statusLock) {
+ reconnectCount = 0;
+ reconnect = false;
+ notify(GRPCChannelStatus.CONNECTED);
+ }
+ }
+
+ /**
+ * Check the connectivity state of existing channel and trigger reconnect
if needed.
+ * This method monitors TRANSIENT_FAILURE state and triggers reconnect if
the failure persists too long.
+ */
+ private void checkChannelStateAndTriggerReconnectIfNeeded() {
+ if (managedChannel != null) {
try {
- listener.statusChanged(status);
+ ConnectivityState state = managedChannel.getState(false);
+ LOGGER.debug("Current channel state: {}", state);
+
+ if (state == ConnectivityState.TRANSIENT_FAILURE) {
+ transientFailureCount++;
+ LOGGER.warn("Channel in TRANSIENT_FAILURE state, count:
{}", transientFailureCount);
+ } else if (state == ConnectivityState.SHUTDOWN) {
+ LOGGER.warn("Channel is SHUTDOWN");
+ if (!reconnect) {
+ triggerReconnect();
+ }
+ } else {
+ // IDLE, READY, CONNECTING are all normal states
+ transientFailureCount = 0;
+ }
Review Comment:
This variable has been removed in the latest commit as we simplified the
reconnection logic per Wu-sheng's feedback.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]