skorotkov commented on code in PR #10178:
URL: https://github.com/apache/ignite/pull/10178#discussion_r939662093
##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java:
##########
@@ -258,6 +264,129 @@ else if (g1Keys.contains(key))
assertEquals(s1, s2);
}
+
+ /**
+ * The test enforces the concurrent processing of the same prepared
transaction both in the
+ * tx recovery procedure started due to primary node left and in the tx
recovery request handler
+ * invoked by message from another backup node.
+ * <p>
+ * The idea is to have a 3-nodes cluster and a cache with 2 backups. So
there will be 2 backup nodes
+ * to execute the tx recovery in parallel if primary one would fail. These
backup nodes will send the
+ * tx recovery requests to each other, so the tx recovery request handler
will be invoked as well.
+ * <p>
+ * Use several attempts to reproduce the race condition.
+ * <p>
+ * Expected result: transaction is finished on both backup nodes and the
partition map exchange is completed as well.
+ */
+ @Test
+ public void testRecoveryNotDeadLockOnPrimaryFail() throws Exception {
+ backups = 2;
+ persistence = false;
+
+ for (int iter = 0; iter < 100; iter++) {
+ stopAllGrids();
+
+ log.info("iteration=" + iter);
+
+ final IgniteEx grid0 = startGrid(0);
+
+ final IgniteEx grid1 = startGrid(1,
(UnaryOperator<IgniteConfiguration>)cfg ->
+ cfg.setSystemThreadPoolSize(1).setStripedPoolSize(1));
+
+ final IgniteEx grid2 = startGrid(2);
+
+ grid0.cluster().state(ACTIVE);
+
+ final IgniteCache<Object, Object> cache =
grid2.cache(DEFAULT_CACHE_NAME);
+
+ final Integer g2Key = primaryKeys(cache, 1, 0).get(0);
+
+ List<IgniteInternalTx> txs0 = null;
+ List<IgniteInternalTx> txs1 = null;
+ List<IgniteInternalTx> txs2;
+
+ final CountDownLatch grid1BlockLatch = new CountDownLatch(1);
+
+ final CountDownLatch grid1NodeLeftEventLatch = new
CountDownLatch(1);
+
+ int[] stripeHolder = new int[1];
+
+ try (final Transaction tx =
grid2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(g2Key, Boolean.TRUE);
+
+ TransactionProxyImpl<?, ?> p = (TransactionProxyImpl<?, ?>)tx;
+
+ p.tx().prepare(true);
+
+ txs0 = txs(grid0);
+ txs1 = txs(grid1);
+ txs2 = txs(grid2);
+
+ assertTrue(txs0.size() == 1);
+ assertTrue(txs1.size() == 1);
+ assertTrue(txs2.size() == 1);
+
+ grid1.events().localListen(new PE() {
+ @Override public boolean apply(Event evt) {
+ grid1NodeLeftEventLatch.countDown();
+
+ return true;
+ }
+ }, EventType.EVT_NODE_LEFT);
+
+ // Block recovery procedure processing on grid1
+ grid1.context().pools().getSystemExecutorService().execute(()
-> U.awaitQuiet(grid1BlockLatch));
+
+ int stripe = U.safeAbs(p.tx().xidVersion().hashCode());
+
+ stripeHolder[0] = stripe;
+
+ // Block stripe tx recovery request processing on grid1
+
grid1.context().pools().getStripedExecutorService().execute(stripe, () ->
U.awaitQuiet(grid1BlockLatch));
Review Comment:
Looks like fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]