sanpwc commented on code in PR #3092: URL: https://github.com/apache/ignite-3/pull/3092#discussion_r1470662339
########## modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java: ########## @@ -2151,12 +2157,150 @@ public void testYoungerTransactionThrowsExceptionIfKeyLockedByOlderTransactionWi assertThrows(TransactionException.class, () -> keyValueView.put(youngNormalTx, 1L, "normal")); } + @RepeatedTest(10) + public void testTransactionMultiThreadedCommit() { + testTransactionMultiThreadedFinish(1); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedRollback() { + testTransactionMultiThreadedFinish(0); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedMixed() { + testTransactionMultiThreadedFinish(-1); + } + + /** + * Test trying to finish a tx in multiple threads simultaneously, and enlist new operations right after the first finish. + * + * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. + */ + private void testTransactionMultiThreadedFinish(int finishMode) { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(); + + var txId = ((ReadWriteTransactionImpl) tx).id(); + + log.info("Started transaction {}", txId); + + rv.upsert(tx, makeValue(1, 100.)); + rv.upsert(tx, makeValue(2, 200.)); + + int threadNum = Runtime.getRuntime().availableProcessors() * 5; + + CyclicBarrier b = new CyclicBarrier(threadNum); + CountDownLatch finishLatch = new CountDownLatch(1); + + List<Exception> enlistExceptions = synchronizedList(new ArrayList<>()); + + var futEnlists = runMultiThreadedAsync(() -> { + finishLatch.await(); + var rnd = ThreadLocalRandom.current(); + + try { + if (rnd.nextBoolean()) { + rv.upsert(tx, makeValue(2, 200.)); + } else { + rv.get(tx, makeKey(1)); + } + } catch (Exception e) { + enlistExceptions.add(e); + } + + return null; + }, threadNum, "txCommitTestThread"); + + var futFinishes = runMultiThreadedAsync(() -> { + b.await(); + + finishTx(tx, finishMode); Review Comment: So, seems that you only check empty transactions finish, is that correct? I'd rather make it variable, meaning that we may add one more boolean parameter emptyTransaction to testTransactionMultiThreadedFinish() and if it's false perform some inserts within tx scope before initial finish. ########## modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java: ########## @@ -2151,12 +2157,150 @@ public void testYoungerTransactionThrowsExceptionIfKeyLockedByOlderTransactionWi assertThrows(TransactionException.class, () -> keyValueView.put(youngNormalTx, 1L, "normal")); } + @RepeatedTest(10) + public void testTransactionMultiThreadedCommit() { + testTransactionMultiThreadedFinish(1); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedRollback() { + testTransactionMultiThreadedFinish(0); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedMixed() { + testTransactionMultiThreadedFinish(-1); + } + + /** + * Test trying to finish a tx in multiple threads simultaneously, and enlist new operations right after the first finish. + * + * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. + */ + private void testTransactionMultiThreadedFinish(int finishMode) { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(); + + var txId = ((ReadWriteTransactionImpl) tx).id(); + + log.info("Started transaction {}", txId); + + rv.upsert(tx, makeValue(1, 100.)); + rv.upsert(tx, makeValue(2, 200.)); + + int threadNum = Runtime.getRuntime().availableProcessors() * 5; + + CyclicBarrier b = new CyclicBarrier(threadNum); + CountDownLatch finishLatch = new CountDownLatch(1); + + List<Exception> enlistExceptions = synchronizedList(new ArrayList<>()); + + var futEnlists = runMultiThreadedAsync(() -> { + finishLatch.await(); + var rnd = ThreadLocalRandom.current(); + + try { + if (rnd.nextBoolean()) { + rv.upsert(tx, makeValue(2, 200.)); + } else { + rv.get(tx, makeKey(1)); + } + } catch (Exception e) { + enlistExceptions.add(e); + } + + return null; + }, threadNum, "txCommitTestThread"); + + var futFinishes = runMultiThreadedAsync(() -> { + b.await(); + + finishTx(tx, finishMode); + + finishLatch.countDown(); + + return null; + }, threadNum, "txCommitTestThread"); + + assertThat(futFinishes, willSucceedFast()); + assertThat(futEnlists, willSucceedFast()); + + assertEquals(threadNum, enlistExceptions.size()); + + for (var e : enlistExceptions) { + assertInstanceOf(TransactionException.class, e); + } + + assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId))); + } + + /** + * Test trying to finish a read only tx in multiple threads simultaneously. + */ + @RepeatedTest(10) + public void testReadOnlyTransactionMultiThreadedFinish() { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(new TransactionOptions().readOnly(true)); + + rv.get(tx, makeKey(1)); + + int threadNum = Runtime.getRuntime().availableProcessors(); + + CyclicBarrier b = new CyclicBarrier(threadNum); + CountDownLatch finishLatch = new CountDownLatch(1); + + var futFinishes = runMultiThreadedAsync(() -> { + b.await(); + + finishTx(tx, -1); Review Comment: I believe that we should also check get/scan "enlist" attempts into finished RO transaction. ########## modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java: ########## @@ -2151,12 +2157,150 @@ public void testYoungerTransactionThrowsExceptionIfKeyLockedByOlderTransactionWi assertThrows(TransactionException.class, () -> keyValueView.put(youngNormalTx, 1L, "normal")); } + @RepeatedTest(10) + public void testTransactionMultiThreadedCommit() { + testTransactionMultiThreadedFinish(1); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedRollback() { + testTransactionMultiThreadedFinish(0); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedMixed() { + testTransactionMultiThreadedFinish(-1); + } + + /** + * Test trying to finish a tx in multiple threads simultaneously, and enlist new operations right after the first finish. + * + * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. + */ + private void testTransactionMultiThreadedFinish(int finishMode) { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(); + + var txId = ((ReadWriteTransactionImpl) tx).id(); + + log.info("Started transaction {}", txId); + + rv.upsert(tx, makeValue(1, 100.)); + rv.upsert(tx, makeValue(2, 200.)); + + int threadNum = Runtime.getRuntime().availableProcessors() * 5; + + CyclicBarrier b = new CyclicBarrier(threadNum); + CountDownLatch finishLatch = new CountDownLatch(1); + + List<Exception> enlistExceptions = synchronizedList(new ArrayList<>()); + + var futEnlists = runMultiThreadedAsync(() -> { + finishLatch.await(); + var rnd = ThreadLocalRandom.current(); + + try { + if (rnd.nextBoolean()) { + rv.upsert(tx, makeValue(2, 200.)); + } else { + rv.get(tx, makeKey(1)); + } + } catch (Exception e) { + enlistExceptions.add(e); + } + + return null; + }, threadNum, "txCommitTestThread"); + + var futFinishes = runMultiThreadedAsync(() -> { + b.await(); + + finishTx(tx, finishMode); + + finishLatch.countDown(); + + return null; + }, threadNum, "txCommitTestThread"); + + assertThat(futFinishes, willSucceedFast()); + assertThat(futEnlists, willSucceedFast()); + + assertEquals(threadNum, enlistExceptions.size()); + + for (var e : enlistExceptions) { + assertInstanceOf(TransactionException.class, e); Review Comment: Let's also assert the the code and message, at least stable part of a message. ########## modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java: ########## @@ -2151,12 +2157,150 @@ public void testYoungerTransactionThrowsExceptionIfKeyLockedByOlderTransactionWi assertThrows(TransactionException.class, () -> keyValueView.put(youngNormalTx, 1L, "normal")); } + @RepeatedTest(10) + public void testTransactionMultiThreadedCommit() { + testTransactionMultiThreadedFinish(1); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedRollback() { + testTransactionMultiThreadedFinish(0); + } + + @RepeatedTest(10) + public void testTransactionMultiThreadedMixed() { + testTransactionMultiThreadedFinish(-1); + } + + /** + * Test trying to finish a tx in multiple threads simultaneously, and enlist new operations right after the first finish. + * + * @param finishMode 1 is commit, 0 is rollback, otherwise random outcome. + */ + private void testTransactionMultiThreadedFinish(int finishMode) { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(); + + var txId = ((ReadWriteTransactionImpl) tx).id(); + + log.info("Started transaction {}", txId); + + rv.upsert(tx, makeValue(1, 100.)); + rv.upsert(tx, makeValue(2, 200.)); + + int threadNum = Runtime.getRuntime().availableProcessors() * 5; + + CyclicBarrier b = new CyclicBarrier(threadNum); + CountDownLatch finishLatch = new CountDownLatch(1); + + List<Exception> enlistExceptions = synchronizedList(new ArrayList<>()); + + var futEnlists = runMultiThreadedAsync(() -> { + finishLatch.await(); + var rnd = ThreadLocalRandom.current(); + + try { + if (rnd.nextBoolean()) { + rv.upsert(tx, makeValue(2, 200.)); + } else { + rv.get(tx, makeKey(1)); + } + } catch (Exception e) { + enlistExceptions.add(e); + } + + return null; + }, threadNum, "txCommitTestThread"); + + var futFinishes = runMultiThreadedAsync(() -> { + b.await(); + + finishTx(tx, finishMode); + + finishLatch.countDown(); + + return null; + }, threadNum, "txCommitTestThread"); + + assertThat(futFinishes, willSucceedFast()); + assertThat(futEnlists, willSucceedFast()); + + assertEquals(threadNum, enlistExceptions.size()); + + for (var e : enlistExceptions) { + assertInstanceOf(TransactionException.class, e); + } + + assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId))); + } + + /** + * Test trying to finish a read only tx in multiple threads simultaneously. + */ + @RepeatedTest(10) + public void testReadOnlyTransactionMultiThreadedFinish() { + var rv = accounts.recordView(); + + rv.upsert(null, makeValue(1, 1.)); + + Transaction tx = igniteTransactions.begin(new TransactionOptions().readOnly(true)); + + rv.get(tx, makeKey(1)); + + int threadNum = Runtime.getRuntime().availableProcessors(); + + CyclicBarrier b = new CyclicBarrier(threadNum); + CountDownLatch finishLatch = new CountDownLatch(1); Review Comment: There's no finishLatch.await() in this test. ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java: ########## @@ -132,10 +132,7 @@ private boolean hasTxFinalizationBegun() { /** {@inheritDoc} */ @Override protected CompletableFuture<Void> finish(boolean commit) { - if (hasTxFinalizationBegun()) { Review Comment: Could you please explain why it was removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org