DanielWang2035 commented on code in PR #17238:
URL: https://github.com/apache/iotdb/pull/17238#discussion_r3278708133
##########
iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java:
##########
@@ -720,13 +807,125 @@ public IndexedConsensusRequest
buildIndexedConsensusRequestForLocalRequest(
new IoTProgressIndex(thisNode.getNodeId(), searchIndex.get() + 1);
((ComparableConsensusRequest)
request).setProgressIndex(iotProgressIndex);
}
- return new IndexedConsensusRequest(searchIndex.get() + 1,
Collections.singletonList(request));
+ return new IndexedConsensusRequest(searchIndex.get() + 1,
Collections.singletonList(request))
+ .setPhysicalTime(assignPhysicalTimeInMs())
+ .setNodeId(thisNode.getNodeId());
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
- long syncIndex, List<IConsensusRequest> requests) {
- return new IndexedConsensusRequest(
- ConsensusReqReader.DEFAULT_SEARCH_INDEX, syncIndex, requests);
+ long syncIndex,
+ long routingEpoch,
+ long physicalTime,
+ int nodeId,
+ List<IConsensusRequest> requests) {
+ observePhysicalTimeLowerBound(physicalTime);
+ IndexedConsensusRequest req =
+ new IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX,
syncIndex, requests);
+ req.setRoutingEpoch(routingEpoch);
+ req.setPhysicalTime(physicalTime);
+ req.setNodeId(nodeId);
+ return req;
+ }
+
+ public WriterSafeFrontierTracker.SafeHlc createIdleSafeHlcForCurrentWriter()
{
+ final long safePhysicalTime = assignPhysicalTimeInMs();
+ final long barrierLocalSeq = searchIndex.get();
+ writerSafeFrontierTracker.recordAppliedProgress(
+ safePhysicalTime, thisNode.getNodeId(), barrierLocalSeq);
+ return new WriterSafeFrontierTracker.SafeHlc(safePhysicalTime,
barrierLocalSeq);
+ }
+
+ public void observeRemoteSafeHlc(
+ final long safePhysicalTime, final int writerNodeId, final long
barrierLocalSeq) {
+ observePhysicalTimeLowerBound(safePhysicalTime);
+ writerSafeFrontierTracker.observePendingSafeHlc(
+ safePhysicalTime, writerNodeId, barrierLocalSeq);
+ }
+
+ public void recordRemoteAppliedWriterProgress(
+ final long physicalTime, final int writerNodeId, final long
appliedLocalSeq) {
+ writerSafeFrontierTracker.recordAppliedProgress(physicalTime,
writerNodeId, appliedLocalSeq);
+ }
+
+ public long getEffectiveSafePhysicalTime(final int writerNodeId) {
+ return writerSafeFrontierTracker.getEffectiveSafePt(writerNodeId);
+ }
+
+ public WriterSafeFrontierTracker getWriterSafeFrontierTracker() {
+ return writerSafeFrontierTracker;
+ }
+
+ public boolean hasSubscriptionConsumers() {
+ return !subscriptionQueueRegistry.isEmpty();
+ }
+
+ private long assignPhysicalTimeInMs() {
+ while (true) {
+ final long previous = lastAssignedPhysicalTime.get();
+ final long candidate = Math.max(System.currentTimeMillis(), previous);
+ if (lastAssignedPhysicalTime.compareAndSet(previous, candidate)) {
+ return candidate;
+ }
+ }
+ }
+
+ private void observePhysicalTimeLowerBound(final long observedPhysicalTime) {
+ if (observedPhysicalTime <= 0) {
+ return;
+ }
+ while (true) {
+ final long previous = lastAssignedPhysicalTime.get();
+ final long candidate = Math.max(previous, observedPhysicalTime);
+ if (candidate == previous ||
lastAssignedPhysicalTime.compareAndSet(previous, candidate)) {
+ return;
+ }
+ }
+ }
+
+ private void initializeWriterMeta() {
+ final long recoveredSearchIndex = searchIndex.get();
+ try {
+ final Optional<WriterMeta> writerMetaOptional =
WriterMeta.load(writerMetaPath);
+ if (writerMetaOptional.isPresent()) {
+ final WriterMeta writerMeta = writerMetaOptional.get();
+ logger.info(
+ "Recovered writer meta for group {} from {}, recoveredLocalSeq={},
"
+ + "persistedLocalSeq={}",
+ consensusGroupId,
+ writerMetaPath,
+ recoveredSearchIndex,
+ writerMeta.getLastAllocatedLocalSeq());
+ lastAssignedPhysicalTime.set(
+ Math.max(writerMeta.getLastAssignedPhysicalTimeMs(),
System.currentTimeMillis()));
+ return;
+ }
+ } catch (IOException e) {
+ logger.warn(
+ "Failed to load writer meta for group {} from {}. Starting fresh
writer metadata.",
+ consensusGroupId,
+ writerMetaPath,
+ e);
+ }
+ lastAssignedPhysicalTime.set(System.currentTimeMillis());
+ logger.info(
+ "Initialized fresh writer meta for group {}, recoveredLocalSeq={}",
+ consensusGroupId,
+ recoveredSearchIndex);
+ }
+
+ private void persistWriterMetaOnSuccess(final IndexedConsensusRequest
indexedConsensusRequest) {
+ try {
+ new WriterMeta(
+ indexedConsensusRequest.getLocalSeq(),
indexedConsensusRequest.getPhysicalTime())
+ .persist(writerMetaPath);
+ } catch (IOException e) {
+ logger.warn(
+ "Failed to persist writer meta for group {} at localSeq={}, pt={}",
+ consensusGroupId,
+ indexedConsensusRequest.getLocalSeq(),
+ indexedConsensusRequest.getPhysicalTime(),
+ e);
+ }
}
Review Comment:
Avoided persisting on every successful write. The latest value is kept in
memory, persisted at most once per second, and flushed on stop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]