Caideyipi commented on code in PR #15048:
URL: https://github.com/apache/iotdb/pull/15048#discussion_r1986995573


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java:
##########
@@ -453,6 +464,11 @@ private void transferQueuedEventsIfNecessary() throws 
Exception {
           LOGGER.debug("Polled event {} from retry queue.", polledEvent);
         }
       }
+
+      // Stop retrying if the execution time exceeds the threshold for better 
realtime performance
+      if (System.currentTimeMillis() - retryStartTime > 
maxRetryExecutionTimeMsPerCall) {
+        break;
+      }
     }
 
     // Trigger cron heartbeat event in retry connector to send batch in time

Review Comment:
   Consider triggering this even if the retryEventQueue is empty?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java:
##########
@@ -111,6 +111,54 @@ protected Map<Integer, PipeTask> buildPipeTasks(final 
PipeMeta pipeMetaFromConfi
     return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
   }
 
+  ////////////////////////// Manage by Pipe Name //////////////////////////
+
+  @Override
+  protected void startPipe(final String pipeName, final long creationTime) {
+    final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    final PipeStatus status = 
existedPipeMeta.getRuntimeMeta().getStatus().get();
+    if (PipeStatus.STOPPED.equals(status) || status == null) {
+      restartPipeToReloadResourceIfNeeded(existedPipeMeta);
+    }
+
+    super.startPipe(pipeName, creationTime);
+  }
+
+  private void restartPipeToReloadResourceIfNeeded(final PipeMeta pipeMeta) {
+    if (System.currentTimeMillis() - pipeMeta.getStaticMeta().getCreationTime()
+        < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+      return;
+    }
+
+    final AtomicLong lastRestartTime =
+        
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
+    if (lastRestartTime != null
+        && System.currentTimeMillis() - lastRestartTime.get()
+            < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+      LOGGER.info(
+          "Skipping reload resource for stopped pipe {} before starting it 
because reloading resource is too frequent.",
+          pipeMeta.getStaticMeta().getPipeName());
+      return;
+    }
+
+    if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) {
+      LOGGER.info(
+          "Flushing storage engine before restarting pipe {}.",
+          pipeMeta.getStaticMeta().getPipeName());
+      final long currentTime = System.currentTimeMillis();
+      StorageEngine.getInstance().syncCloseAllProcessor();
+      WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();

Review Comment:
   Seemingly this is for dynamic flushing? Will this cause more small 
tsFiles?(i.e. flush here, and when collecting historical tsFiles)



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java:
##########
@@ -223,6 +226,10 @@ private boolean canNotUseTabletAnyMore(final 
PipeRealtimeEvent event) {
   }
 
   private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) {
+    if (!isPipeEpochKeepTsFileAfterStuckRestartEnabled) {
+      return false;
+    }
+
     final boolean isPipeTaskCurrentlyRestarted =
         PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
     if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) {

Review Comment:
   May better reduce the logger to "per epoch"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to