DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3278708133


##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -720,13 +807,125 @@ public IndexedConsensusRequest 
buildIndexedConsensusRequestForLocalRequest(
           new IoTProgressIndex(thisNode.getNodeId(), searchIndex.get() + 1);
       ((ComparableConsensusRequest) 
request).setProgressIndex(iotProgressIndex);
     }
-    return new IndexedConsensusRequest(searchIndex.get() + 1, 
Collections.singletonList(request));
+    return new IndexedConsensusRequest(searchIndex.get() + 1, 
Collections.singletonList(request))
+        .setPhysicalTime(assignPhysicalTimeInMs())
+        .setNodeId(thisNode.getNodeId());
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
-      long syncIndex, List<IConsensusRequest> requests) {
-    return new IndexedConsensusRequest(
-        ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
+      long syncIndex,
+      long routingEpoch,
+      long physicalTime,
+      int nodeId,
+      List<IConsensusRequest> requests) {
+    observePhysicalTimeLowerBound(physicalTime);
+    IndexedConsensusRequest req =
+        new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, 
syncIndex, requests);
+    req.setRoutingEpoch(routingEpoch);
+    req.setPhysicalTime(physicalTime);
+    req.setNodeId(nodeId);
+    return req;
+  }
+
+  public WriterSafeFrontierTracker.SafeHlc createIdleSafeHlcForCurrentWriter() 
{
+    final long safePhysicalTime = assignPhysicalTimeInMs();
+    final long barrierLocalSeq = searchIndex.get();
+    writerSafeFrontierTracker.recordAppliedProgress(
+        safePhysicalTime, thisNode.getNodeId(), barrierLocalSeq);
+    return new WriterSafeFrontierTracker.SafeHlc(safePhysicalTime, 
barrierLocalSeq);
+  }
+
+  public void observeRemoteSafeHlc(
+      final long safePhysicalTime, final int writerNodeId, final long 
barrierLocalSeq) {
+    observePhysicalTimeLowerBound(safePhysicalTime);
+    writerSafeFrontierTracker.observePendingSafeHlc(
+        safePhysicalTime, writerNodeId, barrierLocalSeq);
+  }
+
+  public void recordRemoteAppliedWriterProgress(
+      final long physicalTime, final int writerNodeId, final long 
appliedLocalSeq) {
+    writerSafeFrontierTracker.recordAppliedProgress(physicalTime, 
writerNodeId, appliedLocalSeq);
+  }
+
+  public long getEffectiveSafePhysicalTime(final int writerNodeId) {
+    return writerSafeFrontierTracker.getEffectiveSafePt(writerNodeId);
+  }
+
+  public WriterSafeFrontierTracker getWriterSafeFrontierTracker() {
+    return writerSafeFrontierTracker;
+  }
+
+  public boolean hasSubscriptionConsumers() {
+    return !subscriptionQueueRegistry.isEmpty();
+  }
+
+  private long assignPhysicalTimeInMs() {
+    while (true) {
+      final long previous = lastAssignedPhysicalTime.get();
+      final long candidate = Math.max(System.currentTimeMillis(), previous);
+      if (lastAssignedPhysicalTime.compareAndSet(previous, candidate)) {
+        return candidate;
+      }
+    }
+  }
+
+  private void observePhysicalTimeLowerBound(final long observedPhysicalTime) {
+    if (observedPhysicalTime <= 0) {
+      return;
+    }
+    while (true) {
+      final long previous = lastAssignedPhysicalTime.get();
+      final long candidate = Math.max(previous, observedPhysicalTime);
+      if (candidate == previous || 
lastAssignedPhysicalTime.compareAndSet(previous, candidate)) {
+        return;
+      }
+    }
+  }
+
+  private void initializeWriterMeta() {
+    final long recoveredSearchIndex = searchIndex.get();
+    try {
+      final Optional<WriterMeta> writerMetaOptional = 
WriterMeta.load(writerMetaPath);
+      if (writerMetaOptional.isPresent()) {
+        final WriterMeta writerMeta = writerMetaOptional.get();
+        logger.info(
+            "Recovered writer meta for group {} from {}, recoveredLocalSeq={}, 
"
+                + "persistedLocalSeq={}",
+            consensusGroupId,
+            writerMetaPath,
+            recoveredSearchIndex,
+            writerMeta.getLastAllocatedLocalSeq());
+        lastAssignedPhysicalTime.set(
+            Math.max(writerMeta.getLastAssignedPhysicalTimeMs(), 
System.currentTimeMillis()));
+        return;
+      }
+    } catch (IOException e) {
+      logger.warn(
+          "Failed to load writer meta for group {} from {}. Starting fresh 
writer metadata.",
+          consensusGroupId,
+          writerMetaPath,
+          e);
+    }
+    lastAssignedPhysicalTime.set(System.currentTimeMillis());
+    logger.info(
+        "Initialized fresh writer meta for group {}, recoveredLocalSeq={}",
+        consensusGroupId,
+        recoveredSearchIndex);
+  }
+
+  private void persistWriterMetaOnSuccess(final IndexedConsensusRequest 
indexedConsensusRequest) {
+    try {
+      new WriterMeta(
+              indexedConsensusRequest.getLocalSeq(), 
indexedConsensusRequest.getPhysicalTime())
+          .persist(writerMetaPath);
+    } catch (IOException e) {
+      logger.warn(
+          "Failed to persist writer meta for group {} at localSeq={}, pt={}",
+          consensusGroupId,
+          indexedConsensusRequest.getLocalSeq(),
+          indexedConsensusRequest.getPhysicalTime(),
+          e);
+    }
   }

Review Comment:
   Avoided persisting on every successful write. The latest value is kept in 
memory, persisted at most once per second, and flushed on stop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to