Caideyipi commented on code in PR #12723:
URL: https://github.com/apache/iotdb/pull/12723#discussion_r1636190217


##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java:
##########
@@ -274,21 +282,45 @@ public synchronized void 
checkConsensusPipe(Map<ConsensusPipeName, PipeStatus> e
 
   public TSStatus write(IConsensusRequest request) {
     try {
+      long consensusWriteStartTime = System.nanoTime();
       stateMachineLock.lock();
+      long getStateMachineLockTime = System.nanoTime();
+      // statistic the time of acquiring stateMachine lock
+      pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+          getStateMachineLockTime - consensusWriteStartTime);
       if (request instanceof ComparableConsensusRequest) {
         ((ComparableConsensusRequest) request)
             
.setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId()));
       }
-      return stateMachine.write(request);
+
+      long writeToStateMachineStartTime = System.nanoTime();
+      TSStatus result = stateMachine.write(request);
+      long writeToStateMachineEndTime = System.nanoTime();
+      // statistic the time of writing request into stateMachine
+      pipeConsensusServerMetrics.recordWriteStateMachineTime(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      return result;
     } finally {
       stateMachineLock.unlock();
     }
   }
 
   public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
     try {
+      long consensusWriteStartTime = System.nanoTime();
       stateMachineLock.lock();
-      return stateMachine.write(request);
+      long getStateMachineLockTime = System.nanoTime();
+      // statistic the time of acquiring stateMachine lock
+      pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+          getStateMachineLockTime - consensusWriteStartTime);
+
+      long writeToStateMachineStartTime = System.nanoTime();
+      TSStatus result = stateMachine.write(request);
+      long writeToStateMachineEndTime = System.nanoTime();
+      // statistic the time of writing request into stateMachine
+      pipeConsensusServerMetrics.recordWriteStateMachineTime(

Review Comment:
   Same as above.



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java:
##########
@@ -53,6 +53,10 @@ public enum Metric {
   // consensus related
   STAGE("stage"),
   IOT_CONSENSUS("iot_consensus"),
+  PIPE_CONSENSUS("pipe_consensus"),

Review Comment:
   Better place these after "IOT_RECEIVE_LOG"....



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
   public int getRetryBufferSize() {
     return retryEventQueue.size();
   }
+
+  @Override
+  public long getLocalUserWriteProgress() {
+    PipeStaticMeta staticMeta = 
PipeAgent.task().getPipeStaticMeta(consensusPipeName);

Review Comment:
   Better not expose the "staticMeta" in PipeDataNodeTaskAgent only... There 
may be potential altering to it. It's better to add a "getCreationTime" in it.



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.pipe.consensuspipe;
+
+public interface ConsensusPipeConnector {

Review Comment:
   Better not use "connector" since it's a component of pipe.. A better name 
may be "reporter"...



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -478,6 +500,13 @@ public synchronized void clearRetryEventsReferenceCount() {
     }
   }
 
+  public synchronized void clearTransferBufferReferenceCount() {

Review Comment:
   There may still be some events added into the "transferBuffer" even if this 
function has finished.. Can use "synchronized" or the similar handling logic to 
"addFailureEventToRetryQueue".



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
   public int getRetryBufferSize() {
     return retryEventQueue.size();
   }
+
+  @Override
+  public long getLocalUserWriteProgress() {

Review Comment:
   Need a better name... Consensus pipe commit progress?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -66,46 +74,35 @@
 
 // TODO: Optimize the network and disk io for TsFile onComplete
 // TODO: support Tablet Batch
-public class PipeConsensusAsyncConnector extends IoTDBConnector {
-
+public class PipeConsensusAsyncConnector extends IoTDBConnector implements 
ConsensusPipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusAsyncConnector.class);
-
   private static final String CUSTOMIZE_EXCEPTION_MSG =
       "Failed to customize pipeConsensusAsyncConnector because there isn't 
consensusGroupId passed in. Please check your construct parameters.";
-
   private static final String ENQUEUE_EXCEPTION_MSG =
       "Timeout: PipeConsensusConnector offers an event into transferBuffer 
failed, because transferBuffer is full.";
-
   private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver.";
-
   private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver %s:%s.";
-
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
-
   private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
       IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
 
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
-
   // We use enrichedEvent here to make use of 
EnrichedEvent.equalsInPipeConsensus
   private final BlockingQueue<EnrichedEvent> transferBuffer =
       new LinkedBlockingDeque<>(IOTDB_CONFIG.getPipeConsensusPipelineSize());
-
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
   private final int thisDataNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-
+  private PipeConsensusConnectorMetric pipeConsensusConnectorMetric;
+  private String consensusPipeName;
   private int consensusGroupId;
-
   private PipeConsensusSyncConnector retryConnector;
-
   private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncTransferClientManager;
-
   private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder;
+  private long currentReplicateProgress = 0;

Review Comment:
   Better use "volatile" since the variable is read by the metrics and updated 
by the consensus subtask executor.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -136,9 +141,14 @@ public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfigurati
 
     // currently, tablet batch is false by default in PipeConsensus;
     isTabletBatchModeEnabled = false;
+
+    // initialize metric components
+    pipeConsensusConnectorMetric = new PipeConsensusConnectorMetric(this);
+    PipeConsensusSyncLagManager.getInstance().addConsensusPipeConnector(this);
+    
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetric);
   }
 
-  /** Add an event to transferBuffer, whose events will be asynchronizedly 
transfer to receiver. */
+  /** Add an event to transferBuffer, whose events will be asynchronously 
transfer to receiver. */

Review Comment:
   transferred



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java:
##########
@@ -129,6 +136,7 @@ public PipeConsensusServerImpl(
 
   public synchronized void start(boolean startConsensusPipes) throws 
IOException {
     stateMachine.start();
+    MetricService.getInstance().addMetricSet(this.pipeConsensusServerMetrics);

Review Comment:
   removeMetricSet?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -118,10 +115,18 @@ public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfigurati
     }
     consensusGroupId = 
parameters.getInt(PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY);
 
+    // Get consensusPipeName from parameters passed by PipeConsensusImpl
+    if 
(!parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME)) 
{
+      throw new PipeException(CUSTOMIZE_EXCEPTION_MSG);

Review Comment:
   Better use "validate" to keep the semantics.



##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java:
##########
@@ -274,21 +282,45 @@ public synchronized void 
checkConsensusPipe(Map<ConsensusPipeName, PipeStatus> e
 
   public TSStatus write(IConsensusRequest request) {
     try {
+      long consensusWriteStartTime = System.nanoTime();
       stateMachineLock.lock();
+      long getStateMachineLockTime = System.nanoTime();
+      // statistic the time of acquiring stateMachine lock
+      pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+          getStateMachineLockTime - consensusWriteStartTime);
       if (request instanceof ComparableConsensusRequest) {
         ((ComparableConsensusRequest) request)
             
.setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId()));
       }
-      return stateMachine.write(request);
+
+      long writeToStateMachineStartTime = System.nanoTime();
+      TSStatus result = stateMachine.write(request);
+      long writeToStateMachineEndTime = System.nanoTime();
+      // statistic the time of writing request into stateMachine
+      pipeConsensusServerMetrics.recordWriteStateMachineTime(

Review Comment:
   Consider "PERFORMANCE_OVERVIEW_METRICS", like in IoTConsensusServerImpl.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
   public int getRetryBufferSize() {
     return retryEventQueue.size();
   }
+
+  @Override
+  public long getLocalUserWriteProgress() {
+    PipeStaticMeta staticMeta = 
PipeAgent.task().getPipeStaticMeta(consensusPipeName);
+    String committerKey =
+        String.format(
+            "%s_%s_%s", staticMeta.getPipeName(), consensusGroupId, 
staticMeta.getCreationTime());
+    return 
PipeEventCommitManager.getInstance().getGivenConsensusPipeCommitId(committerKey);
+  }
+
+  @Override
+  public long getCurrentPipeReplicateProgress() {

Review Comment:
   What's the difference between it and the "writeProgress"?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetric.java:
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusConnectorMetric implements IMetricSet {

Review Comment:
   In pipe we use "Metrics"....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to