Pengzna commented on code in PR #14219:
URL: https://github.com/apache/iotdb/pull/14219#discussion_r1859917060
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java:
##########
@@ -33,66 +34,50 @@
* <p>Note: every consensusGroup/dataRegion has and only has 1 instance of
this class.
*/
public class PipeConsensusSyncLagManager {
- long userWriteProgress = 0;
- long minReplicateProgress = Long.MAX_VALUE;
+ long syncLag = Long.MIN_VALUE;
+ ReentrantLock lock = new ReentrantLock();
List<ConsensusPipeConnector> consensusPipeConnectorList = new
CopyOnWriteArrayList<>();
- private void updateReplicateProgress() {
- minReplicateProgress = Long.MAX_VALUE;
- // if there isn't a consensus pipe task, replicate progress is
Long.MAX_VALUE.
- if (consensusPipeConnectorList.isEmpty()) {
- return;
- }
- // else we find the minimum progress in all consensus pipe task.
- consensusPipeConnectorList.forEach(
- consensusPipeConnector ->
- minReplicateProgress =
- Math.min(
- minReplicateProgress,
-
consensusPipeConnector.getConsensusPipeReplicateProgress()));
- }
-
- private void updateUserWriteProgress() {
- // if there isn't a consensus pipe task, user write progress is 0.
- if (consensusPipeConnectorList.isEmpty()) {
- userWriteProgress = 0;
- return;
- }
- // since the user write progress of different consensus pipes on the same
DataRegion is the
- // same, we only need to take out one Connector to calculate
- try {
- ConsensusPipeConnector connector = consensusPipeConnectorList.get(0);
- userWriteProgress = connector.getConsensusPipeCommitProgress();
- } catch (Exception e) {
- // if removing the last connector happens after empty check, we may
encounter
- // OutOfBoundsException, in this case, we set userWriteProgress to 0.
- userWriteProgress = 0;
- }
+ private long getSyncLagForSpecificConsensusPipe(ConsensusPipeConnector
consensusPipeConnector) {
+ long userWriteProgress =
consensusPipeConnector.getConsensusPipeCommitProgress();
+ long replicateProgress =
consensusPipeConnector.getConsensusPipeReplicateProgress();
+ return Math.max(userWriteProgress - replicateProgress, 0);
}
public void addConsensusPipeConnector(ConsensusPipeConnector
consensusPipeConnector) {
consensusPipeConnectorList.add(consensusPipeConnector);
Review Comment:
this method is called and is only called in a locked method. so don't need
lock here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]