korlov42 commented on code in PR #4256:
URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1741050516
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,70 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
return;
}
- lastRunFuture =
startCompaction(logicalTopologyService.localLogicalTopology());
+ lastRunFuture = startCompaction(lwm,
logicalTopologyService.localLogicalTopology());
}
});
}
- private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot
topologySnapshot) {
- long localMinimum = localMinTimeProvider.time();
+ private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+ Map<TablePartitionId, @Nullable Long> partitionStates =
localMinTimeProvider.minTimePerPartition();
- if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
- LOG.info("Catalog compaction skipped, nothing to compact
[timestamp={}].", localMinimum);
+ // Find the minimum time among all partitions.
+ long partitionMinTime = Long.MAX_VALUE;
- return CompletableFutures.nullCompletedFuture();
+ for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet())
{
+ Long state = e.getValue();
+
+ if (state == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partition state is missing [partition={},
all={}]", e.getKey(), partitionStates);
+ }
+ return null;
+ }
+
+ partitionMinTime = Math.min(partitionMinTime, state);
}
- return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(),
localMinimum)
+ // Choose the minimum time between the low watermark and the minimum
time among all partitions.
+ long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Local minimum required time: [partitionMinTime={},
lowWatermark={}, chosen={}]",
+ partitionMinTime,
+ lwm,
+ chosenMinTime
+ );
+ }
+
+ return chosenMinTime;
+ }
+
+ private CompletableFuture<Void> startCompaction(HybridTimestamp lwm,
LogicalTopologySnapshot topologySnapshot) {
+ LOG.info("Catalog compaction started at [lowWaterMark={}]", lwm);
+
+ Long localMinRequiredTime = getMinLocalTime(lwm);
+
+ if (localMinRequiredTime == null) {
+ // If do not have local time yet, Use a placeholder value that is
going to be overwritten.
+ localMinRequiredTime = Long.MAX_VALUE;
Review Comment:
I believe the issue is still here. In `determineGlobalMinimumRequiredTime`,
`localMinimumRequiredTime` is used only once to initialize
`globalMinimumRequiredTime`, and if there is at least one remote response,
null-initialized `globalMinimumRequiredTime` will be re-initialized:
```
if (globalMinimumRequiredTime == null || response.minimumRequiredTime() <
globalMinimumRequiredTime) {
globalMinimumRequiredTime = response.minimumRequiredTime();
}
```
Besides, partitions from coordinator are seemed to be completely ignored.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -714,10 +713,15 @@ private void
handleUpdateMinimalActiveTxTimeCommand(UpdateMinimumActiveTxBeginTi
}
long minActiveTxBeginTime0 = minActiveTxBeginTime;
+ long timestamp = cmd.timestamp();
- assert minActiveTxBeginTime0 <= cmd.timestamp() : "maxTime=" +
minActiveTxBeginTime0 + ", cmdTime=" + cmd.timestamp();
+ assert minActiveTxBeginTime0 <= timestamp : "maxTime=" +
minActiveTxBeginTime0 + ", cmdTime=" + timestamp;
- minActiveTxBeginTime = cmd.timestamp();
+ storage.flush(false).whenComplete((r, t) -> {
+ if (timestamp > minActiveTxBeginTime) {
+ minActiveTxBeginTime = timestamp;
+ }
Review Comment:
this doesn't look solid... Do we have guarantee, that there is no other
thread updating `minActiveTxBeginTime` at the same time? If no, we need to do
check + update atomically
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]