Caideyipi commented on code in PR #15295:
URL: https://github.com/apache/iotdb/pull/15295#discussion_r2052333054
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java:
##########
@@ -101,44 +104,51 @@ public PipeTaskProcessorStage(
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final boolean isUsedForConsensusPipe =
pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX);
- final PipeEventCollector pipeConnectorOutputEventCollector =
- new PipeEventCollector(
- pipeConnectorOutputPendingQueue,
- creationTime,
- regionId,
- forceTabletFormat,
- skipParsing,
- isUsedForConsensusPipe);
- this.pipeProcessorSubtask =
- new PipeProcessorSubtask(
- taskId,
- pipeName,
- creationTime,
- regionId,
- pipeExtractorInputEventSupplier,
- pipeProcessor,
- pipeConnectorOutputEventCollector);
+ final int taskNum =
+ pipeProcessorParameters.getIntOrDefault(
+ PipeProcessorConstant.PROCESSOR_IOTDB_PARALLEL_TASKS_KEY,
+
PipeProcessorConstant.PROCESSOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
Review Comment:
I think technically, processor tasks can be greater than it. And it can also
be a way to allocate CPU amongst different processing pipes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]