Pengzna commented on code in PR #12723:
URL: https://github.com/apache/iotdb/pull/12723#discussion_r1637408547


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
   public int getRetryBufferSize() {
     return retryEventQueue.size();
   }
+
+  @Override
+  public long getLocalUserWriteProgress() {
+    PipeStaticMeta staticMeta = 
PipeAgent.task().getPipeStaticMeta(consensusPipeName);

Review Comment:
   fixed



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetric.java:
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusConnectorMetric implements IMetricSet {

Review Comment:
   fixed



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
   public int getRetryBufferSize() {
     return retryEventQueue.size();
   }
+
+  @Override
+  public long getLocalUserWriteProgress() {

Review Comment:
   makes sense



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -136,9 +141,14 @@ public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfigurati
 
     // currently, tablet batch is false by default in PipeConsensus;
     isTabletBatchModeEnabled = false;
+
+    // initialize metric components
+    pipeConsensusConnectorMetric = new PipeConsensusConnectorMetric(this);
+    PipeConsensusSyncLagManager.getInstance().addConsensusPipeConnector(this);
+    
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetric);
   }
 
-  /** Add an event to transferBuffer, whose events will be asynchronizedly 
transfer to receiver. */
+  /** Add an event to transferBuffer, whose events will be asynchronously 
transfer to receiver. */

Review Comment:
   fixed



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -521,4 +551,25 @@ public int getTransferBufferSize() {
   public int getRetryBufferSize() {
     return retryEventQueue.size();
   }
+
+  @Override
+  public long getLocalUserWriteProgress() {
+    PipeStaticMeta staticMeta = 
PipeAgent.task().getPipeStaticMeta(consensusPipeName);
+    String committerKey =
+        String.format(
+            "%s_%s_%s", staticMeta.getPipeName(), consensusGroupId, 
staticMeta.getCreationTime());
+    return 
PipeEventCommitManager.getInstance().getGivenConsensusPipeCommitId(committerKey);
+  }
+
+  @Override
+  public long getCurrentPipeReplicateProgress() {

Review Comment:
   One is the synchronization progress of the pipe event, and the other is the 
progress of the user writing the event



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -66,46 +74,35 @@
 
 // TODO: Optimize the network and disk io for TsFile onComplete
 // TODO: support Tablet Batch
-public class PipeConsensusAsyncConnector extends IoTDBConnector {
-
+public class PipeConsensusAsyncConnector extends IoTDBConnector implements 
ConsensusPipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusAsyncConnector.class);
-
   private static final String CUSTOMIZE_EXCEPTION_MSG =
       "Failed to customize pipeConsensusAsyncConnector because there isn't 
consensusGroupId passed in. Please check your construct parameters.";
-
   private static final String ENQUEUE_EXCEPTION_MSG =
       "Timeout: PipeConsensusConnector offers an event into transferBuffer 
failed, because transferBuffer is full.";
-
   private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver.";
-
   private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver %s:%s.";
-
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
-
   private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
       IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
 
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
-
   // We use enrichedEvent here to make use of 
EnrichedEvent.equalsInPipeConsensus
   private final BlockingQueue<EnrichedEvent> transferBuffer =
       new LinkedBlockingDeque<>(IOTDB_CONFIG.getPipeConsensusPipelineSize());
-
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
   private final int thisDataNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-
+  private PipeConsensusConnectorMetric pipeConsensusConnectorMetric;
+  private String consensusPipeName;
   private int consensusGroupId;
-
   private PipeConsensusSyncConnector retryConnector;
-
   private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncTransferClientManager;
-
   private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder;
+  private long currentReplicateProgress = 0;

Review Comment:
   good catch. fixed



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java:
##########
@@ -478,6 +500,13 @@ public synchronized void clearRetryEventsReferenceCount() {
     }
   }
 
+  public synchronized void clearTransferBufferReferenceCount() {

Review Comment:
   good catch, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to