luoluoyuyu commented on code in PR #15295:
URL: https://github.com/apache/iotdb/pull/15295#discussion_r2052090112
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java:
##########
@@ -101,44 +104,51 @@ public PipeTaskProcessorStage(
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final boolean isUsedForConsensusPipe =
pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
- final PipeEventCollector pipeConnectorOutputEventCollector =
- new PipeEventCollector(
- pipeConnectorOutputPendingQueue,
- creationTime,
- regionId,
- forceTabletFormat,
- skipParsing,
- isUsedForConsensusPipe);
- this.pipeProcessorSubtask =
- new PipeProcessorSubtask(
- taskId,
- pipeName,
- creationTime,
- regionId,
- pipeExtractorInputEventSupplier,
- pipeProcessor,
- pipeConnectorOutputEventCollector);
+ final int taskNum =
+ pipeProcessorParameters.getIntOrDefault(
+ PipeProcessorConstant.PROCESSOR_IOTDB_PARALLEL_TASKS_KEY,
+
PipeProcessorConstant.PROCESSOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+
+ for (int i = 0; i < taskNum; ++i) {
+ final PipeEventCollector pipeConnectorOutputEventCollector =
+ new PipeEventCollector(
+ pipeConnectorOutputPendingQueue,
+ creationTime,
+ regionId,
+ forceTabletFormat,
+ skipParsing,
+ isUsedForConsensusPipe);
+ this.pipeProcessorSubtasks.add(
+ new PipeProcessorSubtask(
+ taskId,
+ pipeName,
+ creationTime,
+ regionId,
+ pipeExtractorInputEventSupplier,
Review Comment:
There will be some problems here. For example, the change of the state
machine will become more complicated, which may introduce unexpected problems.
Here you need to consider whether the Supply function needs to be modified.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java:
##########
@@ -101,44 +104,51 @@ public PipeTaskProcessorStage(
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final boolean isUsedForConsensusPipe =
pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
- final PipeEventCollector pipeConnectorOutputEventCollector =
- new PipeEventCollector(
- pipeConnectorOutputPendingQueue,
- creationTime,
- regionId,
- forceTabletFormat,
- skipParsing,
- isUsedForConsensusPipe);
- this.pipeProcessorSubtask =
- new PipeProcessorSubtask(
- taskId,
- pipeName,
- creationTime,
- regionId,
- pipeExtractorInputEventSupplier,
- pipeProcessor,
- pipeConnectorOutputEventCollector);
+ final int taskNum =
+ pipeProcessorParameters.getIntOrDefault(
+ PipeProcessorConstant.PROCESSOR_IOTDB_PARALLEL_TASKS_KEY,
+
PipeProcessorConstant.PROCESSOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
Review Comment:
This parameter should not be greater than the number of SubTask threads. A
check is required.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]