korlov42 commented on code in PR #4256:
URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1734398680
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,57 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
return;
}
- lastRunFuture =
startCompaction(logicalTopologyService.localLogicalTopology());
+ lastRunFuture = startCompaction(lwm,
logicalTopologyService.localLogicalTopology());
}
});
}
- private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot
topologySnapshot) {
- long localMinimum = localMinTimeProvider.time();
+ private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+ Map<TablePartitionId, @Nullable Long> partitionStates =
localMinTimeProvider.minTimePerPartition();
- if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
- LOG.info("Catalog compaction skipped, nothing to compact
[timestamp={}].", localMinimum);
+ // Find the minimum time among all partitions.
+ long partitionMinTime = Long.MAX_VALUE;
- return CompletableFutures.nullCompletedFuture();
+ for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet())
{
+ Long state = e.getValue();
+
+ if (state == null) {
+ LOG.debug("Partition state is missing [partition={}].",
e.getKey());
+ return null;
+ }
+
+ partitionMinTime = Math.min(partitionMinTime, state);
}
- return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(),
localMinimum)
+ // Choose the minimum time between the low watermark and the minimum
time among all partitions.
+ long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+ LOG.debug("Local minimum required time: [partitionMinTime={},
lowWatermark={}, chosen={}].",
+ partitionMinTime,
+ lwm,
+ chosenMinTime
+ );
+
+ return chosenMinTime;
+ }
+
+ private CompletableFuture<Void> startCompaction(HybridTimestamp lwm,
LogicalTopologySnapshot topologySnapshot) {
+ LOG.info("Catalog compaction started at [lowWaterMark={}].", lwm);
Review Comment:
```suggestion
LOG.info("Catalog compaction started [lowWaterMark={}].", lwm);
```
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,57 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
return;
}
- lastRunFuture =
startCompaction(logicalTopologyService.localLogicalTopology());
+ lastRunFuture = startCompaction(lwm,
logicalTopologyService.localLogicalTopology());
}
});
}
- private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot
topologySnapshot) {
- long localMinimum = localMinTimeProvider.time();
+ private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+ Map<TablePartitionId, @Nullable Long> partitionStates =
localMinTimeProvider.minTimePerPartition();
- if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
- LOG.info("Catalog compaction skipped, nothing to compact
[timestamp={}].", localMinimum);
+ // Find the minimum time among all partitions.
+ long partitionMinTime = Long.MAX_VALUE;
- return CompletableFutures.nullCompletedFuture();
+ for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet())
{
+ Long state = e.getValue();
+
+ if (state == null) {
+ LOG.debug("Partition state is missing [partition={}].",
e.getKey());
+ return null;
+ }
+
+ partitionMinTime = Math.min(partitionMinTime, state);
}
- return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(),
localMinimum)
+ // Choose the minimum time between the low watermark and the minimum
time among all partitions.
+ long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+ LOG.debug("Local minimum required time: [partitionMinTime={},
lowWatermark={}, chosen={}].",
+ partitionMinTime,
+ lwm,
+ chosenMinTime
+ );
+
+ return chosenMinTime;
+ }
+
+ private CompletableFuture<Void> startCompaction(HybridTimestamp lwm,
LogicalTopologySnapshot topologySnapshot) {
+ LOG.info("Catalog compaction started at [lowWaterMark={}].", lwm);
+
+ Long localMinRequiredTime = getMinLocalTime(lwm);
+
+ return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(),
localMinRequiredTime)
.thenComposeAsync(timeHolder -> {
long minRequiredTime = timeHolder.minRequiredTime;
long minActiveTxBeginTime =
timeHolder.minActiveTxBeginTime;
+
Catalog catalog =
catalogManagerFacade.catalogByTsNullable(minRequiredTime);
CompletableFuture<Boolean> catalogCompactionFut;
if (catalog == null) {
- LOG.info("Catalog compaction skipped, nothing to
compact [timestamp={}].", minRequiredTime);
+ LOG.info("Catalog compaction skipped, nothing to
compact [timestamp={}]. No catalog at minRequiredTime.",
Review Comment:
```suggestion
LOG.info("Catalog compaction skipped, nothing to
compact [timestamp={}].",
```
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,57 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
return;
}
- lastRunFuture =
startCompaction(logicalTopologyService.localLogicalTopology());
+ lastRunFuture = startCompaction(lwm,
logicalTopologyService.localLogicalTopology());
}
});
}
- private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot
topologySnapshot) {
- long localMinimum = localMinTimeProvider.time();
+ private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+ Map<TablePartitionId, @Nullable Long> partitionStates =
localMinTimeProvider.minTimePerPartition();
- if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
- LOG.info("Catalog compaction skipped, nothing to compact
[timestamp={}].", localMinimum);
+ // Find the minimum time among all partitions.
+ long partitionMinTime = Long.MAX_VALUE;
- return CompletableFutures.nullCompletedFuture();
+ for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet())
{
+ Long state = e.getValue();
+
+ if (state == null) {
+ LOG.debug("Partition state is missing [partition={}].",
e.getKey());
+ return null;
+ }
+
+ partitionMinTime = Math.min(partitionMinTime, state);
}
- return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(),
localMinimum)
+ // Choose the minimum time between the low watermark and the minimum
time among all partitions.
+ long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+ LOG.debug("Local minimum required time: [partitionMinTime={},
lowWatermark={}, chosen={}].",
Review Comment:
```suggestion
LOG.debug("Minimum required time was chosen [partitionMinTime={},
lowWatermark={}, chosen={}].",
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -143,6 +143,9 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
*/
private volatile long minActiveTxBeginTime = UNDEFINED_MIN_TX_TIME;
+ /** {@link #minActiveTxBeginTime} stored for the latest data storage
flush. */
+ private volatile long snapshottedMinActiveTxBeginTime =
UNDEFINED_MIN_TX_TIME;
Review Comment:
do we really need one more field?
##########
modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java:
##########
@@ -67,11 +77,71 @@
class ItCatalogCompactionTest extends ClusterPerClassIntegrationTest {
private static final int CLUSTER_SIZE = 3;
+ /** How often we update the low water mark. */
+ private static final long LW_UPDATE_TIME_MS =
TimeUnit.SECONDS.toMillis(10);
Review Comment:
why do you set update frequency to such a big interval? I tried to set it to
500ms and test passes 15 times in a row locally, duration is reduced from 1min
to ~7sec
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -370,12 +394,16 @@ CompletableFuture<Void> propagateTimeToLocalReplicas(long
txBeginTime) {
}, executor);
}
- private long determineLocalMinimumRequiredTime() {
- // TODO https://issues.apache.org/jira/browse/IGNITE-22637 Provide
actual minimum required time.
- return HybridTimestamp.MIN_VALUE.longValue();
- }
private CompletableFuture<Boolean> tryCompactCatalog(Catalog catalog,
LogicalTopologySnapshot topologySnapshot) {
+ for (CatalogIndexDescriptor index : catalog.indexes()) {
+ if (index.status() == CatalogIndexStatus.BUILDING ||
index.status() == CatalogIndexStatus.REGISTERED) {
+ LOG.info("Catalog compaction aborted, index construction is
taking place.");
Review Comment:
I see that we log results both inside and outside of this method. Let's pick
one place
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -538,9 +574,9 @@ private void
handlePrepareToUpdateTimeOnReplicasMessage(NetworkMessage message)
/** Minimum required time provider. */
@FunctionalInterface
- interface MinimumRequiredTimeProvider {
- /** Returns minimum required timestamp. */
- long time();
+ public interface MinimumRequiredTimeProvider {
Review Comment:
let's move this interface to separate file
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]