Copilot commented on code in PR #6897:
URL: https://github.com/apache/ignite-3/pull/6897#discussion_r2493453821
##########
modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java:
##########
@@ -138,63 +145,83 @@ void testGlobalMinimumTxRequiredTime() {
IgniteImpl node1 = unwrapIgniteImpl(CLUSTER.node(1));
IgniteImpl node2 = unwrapIgniteImpl(CLUSTER.node(2));
+ DebugInfoCollector debug = new DebugInfoCollector(List.of(node0,
node1, node2));
+
List<CatalogCompactionRunner> compactors = List.of(
node0.catalogCompactionRunner(),
node1.catalogCompactionRunner(),
node2.catalogCompactionRunner()
);
- Catalog catalog1 = getLatestCatalog(node2);
+ debug.recordGlobalTxState("init");
+ debug.recordCatalogState("init");
+ debug.recordMinTxTimesState("init");
- Transaction tx1 = beginTx(node0, false);
+ Catalog catalog1 = getLatestCatalog(node2);
+ InternalTransaction tx1 = beginTx(node0, false);
+ debug.recordTx(tx1);
// Changing the catalog and starting transaction.
sql("create table a(a int primary key)");
Catalog catalog2 = getLatestCatalog(node0);
assertThat(catalog2.version(), is(catalog1.version() + 1));
- List<Transaction> txs2 = Stream.of(node1, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
+ List<InternalTransaction> txs2 = Stream.of(node1, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
List<InternalTransaction> ignoredReadonlyTxs = Stream.of(node0, node1,
node2)
.map(node -> beginTx(node, true))
.collect(Collectors.toList());
+ debug.recordTx(txs2);
+ debug.recordTx(ignoredReadonlyTxs);
+
// Changing the catalog again and starting transaction.
sql("alter table a add column (b int)");
- Awaitility.await().untilAsserted(() ->
assertThat(getLatestCatalogVersion(node1), is(catalog2.version() + 1)));
+ Awaitility.await().until(() -> getLatestCatalogVersion(node1),
is(catalog2.version() + 1));
Review Comment:
[nitpick] This assertion was changed from `untilAsserted()` to `until()`,
but the semantics differ: `until()` returns the supplier's value directly while
`untilAsserted()` evaluates the assertion. The new form works but is less
explicit about what's being tested. Consider keeping `untilAsserted()` unless
there's a specific reason to change it.
```suggestion
Awaitility.await().untilAsserted(() ->
assertThat(getLatestCatalogVersion(node1), is(catalog2.version() + 1)));
```
##########
modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java:
##########
@@ -138,63 +145,83 @@ void testGlobalMinimumTxRequiredTime() {
IgniteImpl node1 = unwrapIgniteImpl(CLUSTER.node(1));
IgniteImpl node2 = unwrapIgniteImpl(CLUSTER.node(2));
+ DebugInfoCollector debug = new DebugInfoCollector(List.of(node0,
node1, node2));
+
List<CatalogCompactionRunner> compactors = List.of(
node0.catalogCompactionRunner(),
node1.catalogCompactionRunner(),
node2.catalogCompactionRunner()
);
- Catalog catalog1 = getLatestCatalog(node2);
+ debug.recordGlobalTxState("init");
+ debug.recordCatalogState("init");
+ debug.recordMinTxTimesState("init");
- Transaction tx1 = beginTx(node0, false);
+ Catalog catalog1 = getLatestCatalog(node2);
+ InternalTransaction tx1 = beginTx(node0, false);
+ debug.recordTx(tx1);
// Changing the catalog and starting transaction.
sql("create table a(a int primary key)");
Catalog catalog2 = getLatestCatalog(node0);
assertThat(catalog2.version(), is(catalog1.version() + 1));
- List<Transaction> txs2 = Stream.of(node1, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
+ List<InternalTransaction> txs2 = Stream.of(node1, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
List<InternalTransaction> ignoredReadonlyTxs = Stream.of(node0, node1,
node2)
.map(node -> beginTx(node, true))
.collect(Collectors.toList());
+ debug.recordTx(txs2);
+ debug.recordTx(ignoredReadonlyTxs);
+
// Changing the catalog again and starting transaction.
sql("alter table a add column (b int)");
- Awaitility.await().untilAsserted(() ->
assertThat(getLatestCatalogVersion(node1), is(catalog2.version() + 1)));
+ Awaitility.await().until(() -> getLatestCatalogVersion(node1),
is(catalog2.version() + 1));
Catalog catalog3 = getLatestCatalog(node1);
- List<Transaction> txs3 = Stream.of(node0, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
+ List<InternalTransaction> txs3 = Stream.of(node0, node2).map(node ->
beginTx(node, false)).collect(Collectors.toList());
+
+ debug.recordTx(txs3);
Collection<InternalClusterNode> topologyNodes =
node0.cluster().nodes().stream()
.map(ClusterNodeImpl::fromPublicClusterNode)
.collect(toUnmodifiableList());
- compactors.forEach(compactor -> {
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
- assertThat(timeHolder.txMinRequiredTime, is(catalog1.time()));
- });
+ for (int i = 0; i < compactors.size(); i++) {
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ String failureMessage = "Initial condition failed on node #" + i;
+
+ assertEquals(catalog1.time(), timeHolder.txMinRequiredTime, () ->
debug.dumpDebugInfo(failureMessage));
+ }
tx1.rollback();
- compactors.forEach(compactor -> {
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
- assertThat(timeHolder.txMinRequiredTime, is(catalog2.time()));
- });
+ for (int i = 0; i < compactors.size(); i++) {
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ String failureMessage = "Condition failed after first tx rollback
on node #" + i;
+
+ assertEquals(catalog2.time(), timeHolder.txMinRequiredTime, () ->
debug.dumpDebugInfo(failureMessage));
+ }
txs2.forEach(Transaction::commit);
- compactors.forEach(compactor -> {
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
- assertThat(timeHolder.txMinRequiredTime, is(catalog3.time()));
- });
+ for (int i = 0; i < compactors.size(); i++) {
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ String failureMessage = "Condition failed after transactions
commit on node #" + i;
+
+ assertEquals(catalog3.time(), timeHolder.txMinRequiredTime, () ->
debug.dumpDebugInfo(failureMessage));
+ }
txs3.forEach(Transaction::rollback);
// Since there are no active RW transactions in the cluster, the
minimum time will be min(now()) across all nodes.
- compactors.forEach(compactor -> {
+ for (int i = 0; i < compactors.size(); i++) {
long minTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
- TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ TimeHolder timeHolder =
await(compactors.get(i).determineGlobalMinimumRequiredTime(topologyNodes, 0L));
long maxTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
Review Comment:
This should use `.max(Long::compareTo)` instead of `.min(Long::compareTo)`
to correctly compute the maximum time across nodes. Currently this is computing
the minimum value again, making the assertion range invalid.
```suggestion
long maxTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).max(Long::compareTo).orElseThrow();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]