Copilot commented on code in PR #17293:
URL: https://github.com/apache/iotdb/pull/17293#discussion_r2925195586


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java:
##########
@@ -67,7 +83,10 @@ public TPipeSubscribeResp handle(final TPipeSubscribeReq 
req) {
 
     final byte reqVersion = req.getVersion();
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
-      return getReceiver(reqVersion).handle(req);
+      final SubscriptionReceiver receiver = getReceiver(reqVersion);
+      activeReceivers.add(receiver);
+      receiver.handleTimeout();

Review Comment:
   `receiver.handleTimeout()` is invoked before `receiver.handle(req)`. Because 
`SubscriptionReceiverV1` only updates `lastActivityTimeMs` inside `handle()` 
(via `beforeHandle`), a request that arrives right after a long idle period 
(including a HEARTBEAT) can be treated as timed-out and closed *before* the 
receiver records that new activity. This can cause spurious server-side closes 
and subsequent requests failing with missing-consumer state.
   
   Consider removing the per-request `handleTimeout()` call (rely on the 
scheduled checker), or move timeout evaluation to after the request has updated 
activity (e.g., inside `SubscriptionReceiverV1.handle()` after `beforeHandle`).
   ```suggestion
   
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java:
##########
@@ -171,6 +184,41 @@ public void handleExit() {
       unsubscribeCompleteTopics(consumerConfig);
       consumerConfigThreadLocal.remove();
     }
+    clearSharedConsumerState();
+  }
+
+  @Override
+  public void handleTimeout() {
+    final ConsumerConfig consumerConfig;
+    final long inactiveMs;
+    final long timeoutMs;
+    synchronized (this) {
+      consumerConfig = sharedConsumerConfig;
+      if (Objects.isNull(consumerConfig) || inFlightRequestCount.get() > 0) {
+        return;
+      }
+      timeoutMs = calculateConsumerInactivityTimeoutMs(consumerConfig);
+      inactiveMs = System.currentTimeMillis() - lastActivityTimeMs;
+      if (inactiveMs <= timeoutMs) {
+        return;
+      }
+      clearSharedConsumerState();
+    }

Review Comment:
   `handleTimeout()` clears the shared consumer state 
(`clearSharedConsumerState()`) before attempting `closeConsumer(...)`. If 
`closeConsumer` fails (e.g., ConfigNode RPC issue or unsubscribe error), the 
receiver has already discarded the config, so subsequent timeout checks won’t 
retry and the consumer may remain leaked/subscribed.
   
   Consider only clearing shared state after a successful close, or retaining a 
“pending close” state to retry with backoff.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java:
##########
@@ -171,6 +184,41 @@ public void handleExit() {
       unsubscribeCompleteTopics(consumerConfig);
       consumerConfigThreadLocal.remove();
     }
+    clearSharedConsumerState();
+  }
+
+  @Override
+  public void handleTimeout() {
+    final ConsumerConfig consumerConfig;
+    final long inactiveMs;
+    final long timeoutMs;
+    synchronized (this) {
+      consumerConfig = sharedConsumerConfig;
+      if (Objects.isNull(consumerConfig) || inFlightRequestCount.get() > 0) {
+        return;
+      }
+      timeoutMs = calculateConsumerInactivityTimeoutMs(consumerConfig);
+      inactiveMs = System.currentTimeMillis() - lastActivityTimeMs;
+      if (inactiveMs <= timeoutMs) {
+        return;
+      }
+      clearSharedConsumerState();
+    }
+
+    LOGGER.info(
+        "Subscription: consumer {} is inactive for {} ms, exceeding timeout {} 
ms, close it on server side.",
+        consumerConfig,
+        inactiveMs,
+        timeoutMs);

Review Comment:
   Timeout-triggered `closeConsumer(consumerConfig)` can race with a concurrent 
HANDSHAKE that re-creates/activates a consumer with the same 
consumerGroupId/consumerId. Since `closeConsumer` drops the consumer by IDs via 
ConfigNode, it can end up closing the newly created consumer if the handshake 
happens while the timeout close is in progress.
   
   Consider guarding `handleTimeout` with a per-consumer/receiver “epoch” 
(incremented on `activateConsumer`) and only closing if the epoch is unchanged, 
or serialize timeout closes vs. handshakes with a dedicated close-in-progress 
flag/lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to