skorotkov commented on code in PR #10178:
URL: https://github.com/apache/ignite/pull/10178#discussion_r938095310
##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +259,110 @@ else if (g1Keys.contains(key))
assertEquals(s1, s2);
}
+
+ /**
+ * The test enforces the concurrent processing of the same prepared
transaction both in the
+ * tx recovery procedure started due to primary node left and in the tx
recovery request handler
+ * invoked by message from another backup node.
+ * <ul>
+ * <li>Start 3 nodes (g0, g1, g2) and cache with 2 backups.</li>
+ * <li>Prepare a transaction with g2 as a primary node.</li>
+ * <li>Kill g2.</li>
+ * <li>Enforce the concurrent processing of transaction in tx recovery on
g1.</li>
+ * </ul>
+ * Use several attempts to reproduce the race condition.
+ * <p>
+ * Expected result: transaction is finished both on g0 and g1
+ */
+ @Test
+ public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+ backups = 2;
+ persistence = false;
+
+ for (int iter = 0; iter < 75; iter++) {
+ stopAllGrids();
+
+ log.info("iteration=" + iter);
+ final IgniteEx grid0 = startGrid("g0");
+ final IgniteEx grid1 = startGrid("g1",
+ cfg ->
cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+ final IgniteEx grid2 = startGrid("g2");
+
+ grid0.cluster().state(ACTIVE);
+
+ final IgniteCache<Object, Object> cache =
grid2.cache(DEFAULT_CACHE_NAME);
+
+ final Integer g2Key = primaryKeys(grid2.cache(DEFAULT_CACHE_NAME),
1, 0).get(0);
+
+ List<IgniteInternalTx> txs0 = null;
+ List<IgniteInternalTx> txs1 = null;
+
+ CountDownLatch grid1BlockLatch = new CountDownLatch(1);
+
+ try (final Transaction tx =
grid2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(g2Key, Boolean.TRUE);
+ TransactionProxyImpl<?, ?> p = (TransactionProxyImpl<?, ?>)tx;
+ p.tx().prepare(true);
+
+ txs0 = txs(grid0);
+ txs1 = txs(grid1);
+ List<IgniteInternalTx> txs2 = txs(grid2);
+
+ assertTrue(txs0.size() == 1);
+ assertTrue(txs1.size() == 1);
+ assertTrue(txs2.size() == 1);
+
+ // Prevent tx recovery request to be sent from grid0 to grid1.
+ spi(grid0).blockMessages(GridCacheTxRecoveryRequest.class,
grid1.name());
+
+ // Block recovery procedure processing on grid1
+ grid1.context().pools().getSystemExecutorService().execute(()
-> U.awaitQuiet(grid1BlockLatch));
+ // Block stripe tx recovery request processing on grid1
+ int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+
grid1.context().pools().getStripedExecutorService().execute(stripe, () ->
U.awaitQuiet(grid1BlockLatch));
+
+ // Prevent finish request processing on grid0 and grid1.
+ spi(grid2).blockMessages(GridDhtTxFinishRequest.class,
grid0.name());
+ spi(grid2).blockMessages(GridDhtTxFinishRequest.class,
grid1.name());
+
+ runAsync(() -> {
+ grid2.close();
+ return null;
+ });
+ }
+ catch (Exception ignored) {
+ // Expected.
+ }
+
+ // Wait grid0 is ready to send the tx recovery request to grid1
+ spi(grid0).waitForBlocked();
+ // Let grid0 send the tx recovery request to grid1
+ log.info("unblock grid0");
+ spi(grid0).stopBlock();
+ // Give grid1 some time to receive the tx recovery request
(processing is still blocked in grid1).
+ log.info("sleep in grid0");
+ doSleep(100);
+
+ // Unblock processing in grid1. Simultaneously in striped and
system pools to start
+ // recovery procedure and the tx recovery request processing at
the "same" moment
+ // (for the same transaction). This should increase chances for
race condition occur in
+ // the IgniteTxAdapter::markFinalizing.
+ log.info("unblock grid1");
Review Comment:
Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]