Caideyipi commented on code in PR #15048:
URL: https://github.com/apache/iotdb/pull/15048#discussion_r1986995573
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java:
##########
@@ -453,6 +464,11 @@ private void transferQueuedEventsIfNecessary() throws
Exception {
LOGGER.debug("Polled event {} from retry queue.", polledEvent);
}
}
+
+ // Stop retrying if the execution time exceeds the threshold for better
realtime performance
+ if (System.currentTimeMillis() - retryStartTime >
maxRetryExecutionTimeMsPerCall) {
+ break;
+ }
}
// Trigger cron heartbeat event in retry connector to send batch in time
Review Comment:
Consider triggering this even if the retryEventQueue is empty?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java:
##########
@@ -111,6 +111,54 @@ protected Map<Integer, PipeTask> buildPipeTasks(final
PipeMeta pipeMetaFromConfi
return new PipeDataNodeBuilder(pipeMetaFromConfigNode).build();
}
+ ////////////////////////// Manage by Pipe Name //////////////////////////
+
+ @Override
+ protected void startPipe(final String pipeName, final long creationTime) {
+ final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+ final PipeStatus status =
existedPipeMeta.getRuntimeMeta().getStatus().get();
+ if (PipeStatus.STOPPED.equals(status) || status == null) {
+ restartPipeToReloadResourceIfNeeded(existedPipeMeta);
+ }
+
+ super.startPipe(pipeName, creationTime);
+ }
+
+ private void restartPipeToReloadResourceIfNeeded(final PipeMeta pipeMeta) {
+ if (System.currentTimeMillis() - pipeMeta.getStaticMeta().getCreationTime()
+ < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+ return;
+ }
+
+ final AtomicLong lastRestartTime =
+
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
+ if (lastRestartTime != null
+ && System.currentTimeMillis() - lastRestartTime.get()
+ < PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()) {
+ LOGGER.info(
+ "Skipping reload resource for stopped pipe {} before starting it
because reloading resource is too frequent.",
+ pipeMeta.getStaticMeta().getPipeName());
+ return;
+ }
+
+ if (PIPE_NAME_TO_LAST_RESTART_TIME_MAP.isEmpty()) {
+ LOGGER.info(
+ "Flushing storage engine before restarting pipe {}.",
+ pipeMeta.getStaticMeta().getPipeName());
+ final long currentTime = System.currentTimeMillis();
+ StorageEngine.getInstance().syncCloseAllProcessor();
+ WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
Review Comment:
Seemingly this is for dynamic flushing? Will this cause more small
tsFiles?(i.e. flush here, and when collecting historical tsFiles)
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java:
##########
@@ -223,6 +226,10 @@ private boolean canNotUseTabletAnyMore(final
PipeRealtimeEvent event) {
}
private boolean isPipeTaskCurrentlyRestarted(final PipeRealtimeEvent event) {
+ if (!isPipeEpochKeepTsFileAfterStuckRestartEnabled) {
+ return false;
+ }
+
final boolean isPipeTaskCurrentlyRestarted =
PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
if (isPipeTaskCurrentlyRestarted && event.mayExtractorUseTablets(this)) {
Review Comment:
May better reduce the logger to "per epoch"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]