Caideyipi commented on code in PR #12355:
URL: https://github.com/apache/iotdb/pull/12355#discussion_r1615526255
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java:
##########
@@ -582,4 +586,47 @@ public Set<Integer> getPipeTaskRegionIdSet(final String
pipeName, final long cre
? Collections.emptySet()
: pipeMeta.getRuntimeMeta().getConsensusGroupId2TaskMetaMap().keySet();
}
+
+ ///////////////////////// Pipe Consensus /////////////////////////
+
+ public ProgressIndex getPipeTaskProgressIndex(String pipeName, int
consensusGroupId) {
+ if (!tryReadLockWithTimeOut(10)) {
+ throw new PipeException(
+ String.format(
+ "Failed to get pipe task progress index with pipe name: %s,
consensus group id %s.",
+ pipeName, consensusGroupId));
+ }
+
+ try {
+ if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
+ throw new PipeException("Pipe meta not found: " + pipeName);
+ }
+
+ return pipeMetaKeeper
+ .getPipeMeta(pipeName)
+ .getRuntimeMeta()
+ .getConsensusGroupId2TaskMetaMap()
+ .get(consensusGroupId)
+ .getProgressIndex();
+ } finally {
+ releaseReadLock();
+ }
+ }
+
+ public Map<ConsensusPipeName, PipeStatus> getAllConsensusPipe() {
+ if (!tryReadLockWithTimeOut(10)) {
+ throw new PipeException("Failed to get all consensus pipe.");
Review Comment:
What will happen if you throw an exception in "buildConsensusConfig"?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ConsensusPipeDataNodeRuntimeAgentGuardian.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeGuardian;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsensusPipeDataNodeRuntimeAgentGuardian implements
ConsensusPipeGuardian {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ConsensusPipeDataNodeRuntimeAgentGuardian.class);
+ private boolean registered = false;
+ private String id;
+
+ @Override
+ public synchronized void start(String id, Runnable guardJob, long
intervalInSeconds) {
+ if (!registered) {
+ LOGGER.info(
+ "Registering periodical job {} with interval in seconds {}.", id,
intervalInSeconds);
Review Comment:
This may contain the same information with "PipePeriodicalJobExecutor". This
may better be deleted or enriched with more precise information.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java:
##########
@@ -132,6 +139,39 @@ private static ConsensusConfig buildConsensusConfig() {
CONF.getRegionMigrationSpeedLimitBytesPerSecond())
.build())
.build())
+ .setPipeConsensusConfig(
+ PipeConsensusConfig.newBuilder()
+ .setRPC(
+ PipeConsensusConfig.RPC
+ .newBuilder()
+
.setConnectionTimeoutInMs(CONF.getConnectionTimeoutInMS())
+
.setRpcSelectorThreadNum(CONF.getRpcSelectorThreadCount())
+
.setRpcMinConcurrentClientNum(CONF.getRpcMinConcurrentClientNum())
+
.setRpcMaxConcurrentClientNum(CONF.getRpcMaxConcurrentClientNum())
+
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
+
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
+ .setThriftServerAwaitTimeForStopService(
+ CONF.getThriftServerAwaitTimeForStopService())
+ .setThriftMaxFrameSize(CONF.getThriftMaxFrameSize())
+
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
Review Comment:
Be sure to check if these configurations is suitable for your connector
algorithm, for example, by default there will be 300 concurrent client per
receiver node....
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ProgressIndexDataNodeManager.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+public class ProgressIndexDataNodeManager implements ProgressIndexManager {
+ private final Map<ConsensusGroupId, ProgressIndex> groupId2MaxProgressIndex;
+ private static final int DATA_NODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+
+ public ProgressIndexDataNodeManager() {
+ this.groupId2MaxProgressIndex = new ConcurrentHashMap<>();
+
+ recoverMaxProgressIndexFromDataRegion();
+ }
+
+ private void recoverMaxProgressIndexFromDataRegion() {
+ StorageEngine.getInstance()
+ .getAllDataRegionIds()
+ .forEach(
+ dataRegionId -> {
+ final TsFileManager tsFileManager =
+
StorageEngine.getInstance().getDataRegion(dataRegionId).getTsFileManager();
+
+ final List<ProgressIndex> allProgressIndex = new ArrayList<>();
+ allProgressIndex.addAll(
+ tsFileManager.getTsFileList(true).stream()
+ .map(TsFileResource::getMaxProgressIndex)
+ .collect(Collectors.toList()));
+ allProgressIndex.addAll(
+ tsFileManager.getTsFileList(false).stream()
+ .map(TsFileResource::getMaxProgressIndex)
+ .collect(Collectors.toList()));
+
+ ProgressIndex maxProgressIndex = MinimumProgressIndex.INSTANCE;
+ for (ProgressIndex progressIndex : allProgressIndex) {
+ maxProgressIndex =
+
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
+ extractLocalSimpleProgressIndex(progressIndex));
+ }
+ groupId2MaxProgressIndex
+ .computeIfAbsent(dataRegionId, o ->
MinimumProgressIndex.INSTANCE)
+
.updateToMinimumEqualOrIsAfterProgressIndex(maxProgressIndex);
+ groupId2MaxProgressIndex.put(
Review Comment:
It seems that either of the two statements can get the expected result. I
suggest only reserve the former one, and use that in the "assignProgressIndex".
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/builder/PipeConsensusSyncBatchReqBuilder.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+
+public class PipeConsensusSyncBatchReqBuilder extends
PipeConsensusTransferBatchReqBuilder {
Review Comment:
Remember to check those and apply the newest version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]