Caideyipi commented on code in PR #12723:
URL: https://github.com/apache/iotdb/pull/12723#discussion_r1636190217
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java:
##########
@@ -274,21 +282,45 @@ public synchronized void
checkConsensusPipe(Map<ConsensusPipeName, PipeStatus> e
public TSStatus write(IConsensusRequest request) {
try {
+ long consensusWriteStartTime = System.nanoTime();
stateMachineLock.lock();
+ long getStateMachineLockTime = System.nanoTime();
+ // statistic the time of acquiring stateMachine lock
+ pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+ getStateMachineLockTime - consensusWriteStartTime);
if (request instanceof ComparableConsensusRequest) {
((ComparableConsensusRequest) request)
.setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId()));
}
- return stateMachine.write(request);
+
+ long writeToStateMachineStartTime = System.nanoTime();
+ TSStatus result = stateMachine.write(request);
+ long writeToStateMachineEndTime = System.nanoTime();
+ // statistic the time of writing request into stateMachine
+ pipeConsensusServerMetrics.recordWriteStateMachineTime(
+ writeToStateMachineEndTime - writeToStateMachineStartTime);
+ return result;
} finally {
stateMachineLock.unlock();
}
}
public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
try {
+ long consensusWriteStartTime = System.nanoTime();
stateMachineLock.lock();
- return stateMachine.write(request);
+ long getStateMachineLockTime = System.nanoTime();
+ // statistic the time of acquiring stateMachine lock
+ pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+ getStateMachineLockTime - consensusWriteStartTime);
+
+ long writeToStateMachineStartTime = System.nanoTime();
+ TSStatus result = stateMachine.write(request);
+ long writeToStateMachineEndTime = System.nanoTime();
+ // statistic the time of writing request into stateMachine
+ pipeConsensusServerMetrics.recordWriteStateMachineTime(
Review Comment:
Same as above.
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java:
##########
@@ -53,6 +53,10 @@ public enum Metric {
// consensus related
STAGE("stage"),
IOT_CONSENSUS("iot_consensus"),
+ PIPE_CONSENSUS("pipe_consensus"),
Review Comment:
Better place these after "IOT_RECEIVE_LOG"....
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
public int getRetryBufferSize() {
return retryEventQueue.size();
}
+
+ @Override
+ public long getLocalUserWriteProgress() {
+ PipeStaticMeta staticMeta =
PipeAgent.task().getPipeStaticMeta(consensusPipeName);
Review Comment:
Better not expose the "staticMeta" in PipeDataNodeTaskAgent only... There
may be potential altering to it. It's better to add a "getCreationTime" in it.
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.pipe.consensuspipe;
+
+public interface ConsensusPipeConnector {
Review Comment:
Better not use "connector" since it's a component of pipe.. A better name
may be "reporter"...
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -478,6 +500,13 @@ public synchronized void clearRetryEventsReferenceCount() {
}
}
+ public synchronized void clearTransferBufferReferenceCount() {
Review Comment:
There may still be some events added into the "transferBuffer" even if this
function has finished.. Can use "synchronized" or the similar handling logic to
"addFailureEventToRetryQueue".
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
public int getRetryBufferSize() {
return retryEventQueue.size();
}
+
+ @Override
+ public long getLocalUserWriteProgress() {
Review Comment:
Need a better name... Consensus pipe commit progress?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -66,46 +74,35 @@
// TODO: Optimize the network and disk io for TsFile onComplete
// TODO: support Tablet Batch
-public class PipeConsensusAsyncConnector extends IoTDBConnector {
-
+public class PipeConsensusAsyncConnector extends IoTDBConnector implements
ConsensusPipeConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusAsyncConnector.class);
-
private static final String CUSTOMIZE_EXCEPTION_MSG =
"Failed to customize pipeConsensusAsyncConnector because there isn't
consensusGroupId passed in. Please check your construct parameters.";
-
private static final String ENQUEUE_EXCEPTION_MSG =
"Timeout: PipeConsensusConnector offers an event into transferBuffer
failed, because transferBuffer is full.";
-
private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
"Failed to borrow client from client pool or exception occurred "
+ "when sending to receiver.";
-
private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
"Failed to borrow client from client pool or exception occurred "
+ "when sending to receiver %s:%s.";
-
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
-
private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
-
// We use enrichedEvent here to make use of
EnrichedEvent.equalsInPipeConsensus
private final BlockingQueue<EnrichedEvent> transferBuffer =
new LinkedBlockingDeque<>(IOTDB_CONFIG.getPipeConsensusPipelineSize());
-
private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
private final int thisDataNodeId =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-
+ private PipeConsensusConnectorMetric pipeConsensusConnectorMetric;
+ private String consensusPipeName;
private int consensusGroupId;
-
private PipeConsensusSyncConnector retryConnector;
-
private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncTransferClientManager;
-
private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder;
+ private long currentReplicateProgress = 0;
Review Comment:
Better use "volatile" since the variable is read by the metrics and updated
by the consensus subtask executor.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -136,9 +141,14 @@ public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfigurati
// currently, tablet batch is false by default in PipeConsensus;
isTabletBatchModeEnabled = false;
+
+ // initialize metric components
+ pipeConsensusConnectorMetric = new PipeConsensusConnectorMetric(this);
+ PipeConsensusSyncLagManager.getInstance().addConsensusPipeConnector(this);
+
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetric);
}
- /** Add an event to transferBuffer, whose events will be asynchronizedly
transfer to receiver. */
+ /** Add an event to transferBuffer, whose events will be asynchronously
transfer to receiver. */
Review Comment:
transferred
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java:
##########
@@ -129,6 +136,7 @@ public PipeConsensusServerImpl(
public synchronized void start(boolean startConsensusPipes) throws
IOException {
stateMachine.start();
+ MetricService.getInstance().addMetricSet(this.pipeConsensusServerMetrics);
Review Comment:
removeMetricSet?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -118,10 +115,18 @@ public void customize(PipeParameters parameters,
PipeConnectorRuntimeConfigurati
}
consensusGroupId =
parameters.getInt(PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY);
+ // Get consensusPipeName from parameters passed by PipeConsensusImpl
+ if
(!parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME))
{
+ throw new PipeException(CUSTOMIZE_EXCEPTION_MSG);
Review Comment:
Better use "validate" to keep the semantics.
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java:
##########
@@ -274,21 +282,45 @@ public synchronized void
checkConsensusPipe(Map<ConsensusPipeName, PipeStatus> e
public TSStatus write(IConsensusRequest request) {
try {
+ long consensusWriteStartTime = System.nanoTime();
stateMachineLock.lock();
+ long getStateMachineLockTime = System.nanoTime();
+ // statistic the time of acquiring stateMachine lock
+ pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+ getStateMachineLockTime - consensusWriteStartTime);
if (request instanceof ComparableConsensusRequest) {
((ComparableConsensusRequest) request)
.setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId()));
}
- return stateMachine.write(request);
+
+ long writeToStateMachineStartTime = System.nanoTime();
+ TSStatus result = stateMachine.write(request);
+ long writeToStateMachineEndTime = System.nanoTime();
+ // statistic the time of writing request into stateMachine
+ pipeConsensusServerMetrics.recordWriteStateMachineTime(
Review Comment:
Consider "PERFORMANCE_OVERVIEW_METRICS", like in IoTConsensusServerImpl.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
public int getRetryBufferSize() {
return retryEventQueue.size();
}
+
+ @Override
+ public long getLocalUserWriteProgress() {
+ PipeStaticMeta staticMeta =
PipeAgent.task().getPipeStaticMeta(consensusPipeName);
+ String committerKey =
+ String.format(
+ "%s_%s_%s", staticMeta.getPipeName(), consensusGroupId,
staticMeta.getCreationTime());
+ return
PipeEventCommitManager.getInstance().getGivenConsensusPipeCommitId(committerKey);
+ }
+
+ @Override
+ public long getCurrentPipeReplicateProgress() {
Review Comment:
What's the difference between it and the "writeProgress"?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetric.java:
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusConnectorMetric implements IMetricSet {
Review Comment:
In pipe we use "Metrics"....
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]