jt2594838 commented on code in PR #15599:
URL: https://github.com/apache/iotdb/pull/15599#discussion_r2122465409
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/runtime/PipePeriodicalJobExecutor.java:
##########
@@ -21,15 +21,47 @@
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
/**
* The shortest scheduling cycle for these jobs is {@link
* PipeConfig#getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()},
suitable for jobs that are
* NOT time-critical.
*/
public class PipePeriodicalJobExecutor extends
AbstractPipePeriodicalJobExecutor {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipePeriodicalJobExecutor.class);
+ // This background service is used to execute jobs that need to be cancelled
and released.
+ private static final ScheduledExecutorService backgroundService =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.PIPE_PROGRESS_INDEX_BACKGROUND_SERVICE.getName());
+
+ public static Future<?> submitBackgroundJob(
+ Runnable job, long initialDelayInMs, long periodInMs) {
+ return ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ backgroundService, job, initialDelayInMs, periodInMs,
TimeUnit.MILLISECONDS);
+ }
+
+ public static void shutdownBackgroundService() {
+ backgroundService.shutdownNow();
+ try {
+ if (!backgroundService.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOGGER.warn("Pipe progressIndex background service did not terminate
within {}s", 30);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Pipe progressIndex background service still doesn't exit
after 30s");
Review Comment:
This message is imprecise.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java:
##########
@@ -58,18 +88,105 @@ public class PipeTaskMeta {
private final Set<PipeRuntimeException> exceptionMessages =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final
int leaderNodeId) {
+ public PipeTaskMeta(
+ /* @NotNull */ final ProgressIndex progressIndex,
+ final int leaderNodeId,
+ final int taskIndex,
+ final boolean needPersistProgressIndex) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
+ this.taskIndex = taskIndex;
+ // PipeTaskMeta created in configNode doesn't need to persist progress
index.
+ this.needPersistProgressIndex = needPersistProgressIndex;
+ this.progressIndexPersistFile =
+ new File(
+ IoTDBConstant.DN_DEFAULT_DATA_DIR
+ + File.separator
+ + IoTDBConstant.SYSTEM_FOLDER_NAME
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ + File.separator
+ +
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
+ PREFIX + taskIndex);
}
public ProgressIndex getProgressIndex() {
return progressIndex.get();
}
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
- return progressIndex.updateAndGet(
+ // only pipeTaskMeta that need to updateProgressIndex will persist
progress index
+ // isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
+ if (needPersistProgressIndex
+ && !isRegisterPersistTask.getAndSet(true)
+ && this.persistProgressIndexFuture == null) {
+ this.persistProgressIndexFuture =
+ PipePeriodicalJobExecutor.submitBackgroundJob(
+ this::persistProgressIndex, 0, TimeUnit.SECONDS.toMillis(20));
Review Comment:
Could 20ms be too frequent? Considering persistProgressIndex() is
synchronized.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java:
##########
@@ -19,33 +19,63 @@
package org.apache.iotdb.commons.pipe.agent.task.meta;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeExceptionType;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
+import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class PipeTaskMeta {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTaskMeta.class);
+ private static final String PREFIX = "__progressIndex_";
+
private final AtomicReference<ProgressIndex> progressIndex = new
AtomicReference<>();
private final AtomicInteger leaderNodeId = new AtomicInteger(0);
+ private final AtomicLong updateCount = new AtomicLong(0);
+ private final AtomicLong lastPersistCount = new AtomicLong(0);
+ private final long checkPointGap =
+ PipeConfig.getInstance().getPipeProgressIndexPersistCheckPoint();
Review Comment:
PipeProgressIndexPersistCheckPoint -> PipeProgressIndexPersistCheckPointGap
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java:
##########
@@ -58,18 +88,105 @@ public class PipeTaskMeta {
private final Set<PipeRuntimeException> exceptionMessages =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final
int leaderNodeId) {
+ public PipeTaskMeta(
+ /* @NotNull */ final ProgressIndex progressIndex,
+ final int leaderNodeId,
+ final int taskIndex,
+ final boolean needPersistProgressIndex) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
+ this.taskIndex = taskIndex;
+ // PipeTaskMeta created in configNode doesn't need to persist progress
index.
+ this.needPersistProgressIndex = needPersistProgressIndex;
+ this.progressIndexPersistFile =
+ new File(
+ IoTDBConstant.DN_DEFAULT_DATA_DIR
+ + File.separator
+ + IoTDBConstant.SYSTEM_FOLDER_NAME
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ + File.separator
+ +
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
+ PREFIX + taskIndex);
Review Comment:
if (needPersistProgressIndex) ?
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java:
##########
@@ -58,18 +88,105 @@ public class PipeTaskMeta {
private final Set<PipeRuntimeException> exceptionMessages =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final
int leaderNodeId) {
+ public PipeTaskMeta(
+ /* @NotNull */ final ProgressIndex progressIndex,
+ final int leaderNodeId,
+ final int taskIndex,
+ final boolean needPersistProgressIndex) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
+ this.taskIndex = taskIndex;
+ // PipeTaskMeta created in configNode doesn't need to persist progress
index.
+ this.needPersistProgressIndex = needPersistProgressIndex;
+ this.progressIndexPersistFile =
+ new File(
+ IoTDBConstant.DN_DEFAULT_DATA_DIR
+ + File.separator
+ + IoTDBConstant.SYSTEM_FOLDER_NAME
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ + File.separator
+ +
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
+ PREFIX + taskIndex);
}
public ProgressIndex getProgressIndex() {
return progressIndex.get();
}
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
- return progressIndex.updateAndGet(
+ // only pipeTaskMeta that need to updateProgressIndex will persist
progress index
+ // isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
+ if (needPersistProgressIndex
+ && !isRegisterPersistTask.getAndSet(true)
+ && this.persistProgressIndexFuture == null) {
+ this.persistProgressIndexFuture =
+ PipePeriodicalJobExecutor.submitBackgroundJob(
+ this::persistProgressIndex, 0, TimeUnit.SECONDS.toMillis(20));
+ }
+
+ progressIndex.updateAndGet(
index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
+ if (needPersistProgressIndex
+ && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap) {
+ persistProgressIndex();
+ }
+ return progressIndex.get();
+ }
+
+ private synchronized void persistProgressIndex() {
+ if (lastPersistCount.get() == updateCount.get()) {
+ // in case of multiple threads calling updateProgressIndex at the same
time
+ return;
+ }
+
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ progressIndex.get().serialize(outputStream);
+ // append is false by default.
+ FileUtils.writeByteArrayToFile(
+ progressIndexPersistFile, byteArrayOutputStream.toByteArray(),
false);
Review Comment:
May add a function
`FileUtils.writeByteArrayToFile(progressIndexPersistFile,
byteArrayOutputStream.getBuf(), byteArrayOutputStream.size(), false);` to avoid
buffer copying.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java:
##########
@@ -58,18 +88,105 @@ public class PipeTaskMeta {
private final Set<PipeRuntimeException> exceptionMessages =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final
int leaderNodeId) {
+ public PipeTaskMeta(
+ /* @NotNull */ final ProgressIndex progressIndex,
+ final int leaderNodeId,
+ final int taskIndex,
+ final boolean needPersistProgressIndex) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
+ this.taskIndex = taskIndex;
+ // PipeTaskMeta created in configNode doesn't need to persist progress
index.
+ this.needPersistProgressIndex = needPersistProgressIndex;
+ this.progressIndexPersistFile =
+ new File(
+ IoTDBConstant.DN_DEFAULT_DATA_DIR
+ + File.separator
+ + IoTDBConstant.SYSTEM_FOLDER_NAME
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ + File.separator
+ +
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
+ PREFIX + taskIndex);
}
public ProgressIndex getProgressIndex() {
return progressIndex.get();
}
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
- return progressIndex.updateAndGet(
+ // only pipeTaskMeta that need to updateProgressIndex will persist
progress index
+ // isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
+ if (needPersistProgressIndex
+ && !isRegisterPersistTask.getAndSet(true)
+ && this.persistProgressIndexFuture == null) {
+ this.persistProgressIndexFuture =
+ PipePeriodicalJobExecutor.submitBackgroundJob(
+ this::persistProgressIndex, 0, TimeUnit.SECONDS.toMillis(20));
+ }
+
+ progressIndex.updateAndGet(
index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
+ if (needPersistProgressIndex
+ && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap) {
+ persistProgressIndex();
+ }
+ return progressIndex.get();
+ }
+
+ private synchronized void persistProgressIndex() {
+ if (lastPersistCount.get() == updateCount.get()) {
+ // in case of multiple threads calling updateProgressIndex at the same
time
+ return;
+ }
+
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ progressIndex.get().serialize(outputStream);
+ // append is false by default.
+ FileUtils.writeByteArrayToFile(
+ progressIndexPersistFile, byteArrayOutputStream.toByteArray(),
false);
+ lastPersistCount.set(updateCount.get());
Review Comment:
`updateCount` may be concurrently modified by `updateProgressIndex()`, is
this okay?
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java:
##########
@@ -113,6 +230,7 @@ public synchronized void serialize(final OutputStream
outputStream) throws IOExc
progressIndex.get().serialize(outputStream);
ReadWriteIOUtils.write(leaderNodeId.get(), outputStream);
+ ReadWriteIOUtils.write(taskIndex, outputStream);
Review Comment:
Notice the incompatibility that may be introduced by this.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java:
##########
@@ -58,18 +88,105 @@ public class PipeTaskMeta {
private final Set<PipeRuntimeException> exceptionMessages =
Collections.newSetFromMap(new ConcurrentHashMap<>());
- public PipeTaskMeta(/* @NotNull */ final ProgressIndex progressIndex, final
int leaderNodeId) {
+ public PipeTaskMeta(
+ /* @NotNull */ final ProgressIndex progressIndex,
+ final int leaderNodeId,
+ final int taskIndex,
+ final boolean needPersistProgressIndex) {
this.progressIndex.set(progressIndex);
this.leaderNodeId.set(leaderNodeId);
+ this.taskIndex = taskIndex;
+ // PipeTaskMeta created in configNode doesn't need to persist progress
index.
+ this.needPersistProgressIndex = needPersistProgressIndex;
+ this.progressIndexPersistFile =
+ new File(
+ IoTDBConstant.DN_DEFAULT_DATA_DIR
+ + File.separator
+ + IoTDBConstant.SYSTEM_FOLDER_NAME
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ + File.separator
+ +
PipeConfig.getInstance().getPipeProgressIndexPersistDirName(),
+ PREFIX + taskIndex);
}
public ProgressIndex getProgressIndex() {
return progressIndex.get();
}
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
- return progressIndex.updateAndGet(
+ // only pipeTaskMeta that need to updateProgressIndex will persist
progress index
+ // isRegisterPersistTask is used to avoid multiple threads registering
persist task concurrently
+ if (needPersistProgressIndex
+ && !isRegisterPersistTask.getAndSet(true)
+ && this.persistProgressIndexFuture == null) {
+ this.persistProgressIndexFuture =
+ PipePeriodicalJobExecutor.submitBackgroundJob(
+ this::persistProgressIndex, 0, TimeUnit.SECONDS.toMillis(20));
+ }
+
+ progressIndex.updateAndGet(
index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
+ if (needPersistProgressIndex
+ && updateCount.incrementAndGet() - lastPersistCount.get() >
checkPointGap) {
+ persistProgressIndex();
+ }
+ return progressIndex.get();
+ }
+
+ private synchronized void persistProgressIndex() {
+ if (lastPersistCount.get() == updateCount.get()) {
+ // in case of multiple threads calling updateProgressIndex at the same
time
+ return;
+ }
+
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
Review Comment:
Any chance we can reuse the buffer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]