Cyrill commented on code in PR #3219:
URL: https://github.com/apache/ignite-3/pull/3219#discussion_r1512701240
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java:
##########
@@ -955,6 +958,78 @@ private void scanSingleEntryAndLeaveCursorOpen(IgniteImpl
targetNode, TableViewI
assertEquals(initialCursorsCount + 1,
targetNode.resourcesRegistry().resources().size());
}
+ @Test
+ public void testCursorsClosedAfterTxClose() throws Exception {
+ TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+ int partId = 0;
+
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), partId);
+
+ String leaseholder = waitAndGetPrimaryReplica(node(0),
tblReplicationGrp).getLeaseholder();
+
+ IgniteImpl txExecNode = findNodeByName(leaseholder);
+
+ log.info("Transaction will be executed on [node={}].",
txExecNode.name());
+
+ IgniteImpl txCrdNode = findNode(1, initialNodes(), n ->
!leaseholder.equals(n.name()));
+
+ log.info("Transaction coordinator is [node={}].", txCrdNode.name());
+
+ // Ensure there are no open cursors.
+ assertEquals(0, txExecNode.resourcesRegistry().resources().size());
+
+ preloadData(txCrdNode.tables().table(TABLE_NAME), 10);
+
+ CompletableFuture<Void> txRequestCaptureFut = new
CompletableFuture<>();
+ CompletableFuture<Void> txResponseCaptureFut = new
CompletableFuture<>();
+
+ InternalTransaction roTx = (InternalTransaction)
txCrdNode.transactions().begin(new TransactionOptions().readOnly(true));
+
+ log.info("Run scan in RO [txId={}].", roTx.id());
+
+ txCrdNode.dropMessages((nodeName, msg) -> {
+ if (msg instanceof FinishedTransactionsBatchMessage) {
+ Collection<UUID> transactions =
((FinishedTransactionsBatchMessage) msg).transactions();
+
+ if (transactions.contains(roTx.id())) {
+ // Caught the request containing the finished tx.
+ txRequestCaptureFut.complete(null);
+ } else {
+ log.info("Received FinishedTransactionsBatchMessage
without tx [txId={}]", roTx.id());
+ }
+ }
+
+ return false;
+ });
+
+ txExecNode.dropMessages((nodeName, msg) -> {
+ if (msg instanceof FinishedTransactionsBatchResponse) {
+ // Now make sure we received the response for the request with
roTx.
+ // By this time the resources should have been cleaned.
+ if (txRequestCaptureFut.isDone()) {
+ txResponseCaptureFut.complete(null);
+ }
+ }
+
+ return false;
+ });
+
+ scanSingleEntryAndLeaveCursorOpen(txExecNode, (TableViewInternal)
txCrdNode.tables().table(TABLE_NAME), roTx);
+
+ // After the RO scan there should be one open cursor.
+ assertEquals(1, txExecNode.resourcesRegistry().resources().size());
+
+ roTx.commit();
+
+ // Wait for the cursor cleanup message to arrive.
+ assertThat(txRequestCaptureFut, willCompleteSuccessfully());
+ assertThat(txResponseCaptureFut, willCompleteSuccessfully());
+
+ // Now check that the cursor is closed.
+ assertEquals(0, txExecNode.resourcesRegistry().resources().size());
Review Comment:
I wait for the handling of txResponseCaptureFut. Once that is handled - the
resources are cleared
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]