ARTEMIS-773 closing connections through finally to avoid thread leakage
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54b7dcc4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54b7dcc4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54b7dcc4 Branch: refs/heads/master Commit: 54b7dcc48e3e76eca2a7cca1348e7c2625cc7ba6 Parents: 330ddf0 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Fri Oct 7 10:57:21 2016 -0400 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Fri Oct 7 11:04:45 2016 -0400 ---------------------------------------------------------------------- .../integration/amqp/AmqpTransactionTest.java | 275 ++++++++++--------- 1 file changed, 141 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54b7dcc4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index e82e7b3..e42a718 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -485,73 +485,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); - // Root TXN session controls all TXN send lifetimes. - AmqpSession txnSession = connection.createSession(); - - // Normal Session which won't create an TXN itself - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); + try { - for (int i = 0; i < NUM_MESSAGES + 1; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setText("Test-Message"); - message.setApplicationProperty("msgId", i); - sender.send(message, txnSession.getTransactionId()); - } + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); - // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); - ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); - receiver.flow((NUM_MESSAGES + 2) * 2); - for (int i = 0; i < NUM_MESSAGES; ++i) { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); - messages.add(message); - } - - // Commit half the consumed messages - txnSession.begin(); - for (int i = 0; i < NUM_MESSAGES / 2; ++i) { - messages.get(i).accept(txnSession); - } - txnSession.commit(); - - // Rollback the other half the consumed messages - txnSession.begin(); - for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { - messages.get(i).accept(txnSession); - } - txnSession.rollback(); - - { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); - assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); - message.release(); - } - - // Commit the other half the consumed messages - // This is a variation from the .NET client tests which doesn't settle the - // messages in the TX until commit is called but on ActiveMQ they will be - // redispatched regardless and not stay in the acquired state. - txnSession.begin(); - for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); - message.accept(); - } - txnSession.commit(); + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); - // The final message should still be pending. - { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - receiver.flow(1); - assertNotNull(message); - assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); - message.release(); + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver(getTestName()); + ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + messages.add(message); + } + + // Commit half the consumed messages + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.commit(); + + // Rollback the other half the consumed messages + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.rollback(); + + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + // Commit the other half the consumed messages + // This is a variation from the .NET client tests which doesn't settle the + // messages in the TX until commit is called but on ActiveMQ they will be + // redispatched regardless and not stay in the acquired state. + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + message.accept(); + } + txnSession.commit(); + + // The final message should still be pending. + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + receiver.flow(1); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + } finally { + connection.close(); } - - connection.close(); } @Test(timeout = 60000) @@ -630,83 +634,86 @@ public class AmqpTransactionTest extends AmqpClientTestSupport { AmqpClient client = createAmqpClient(); AmqpConnection connection = client.connect(); - // Root TXN session controls all TXN send lifetimes. - AmqpSession txnSession = connection.createSession(); - - // Normal Session which won't create an TXN itself - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(getTestName()); - - for (int i = 0; i < NUM_MESSAGES + 1; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setText("Test-Message"); - message.setApplicationProperty("msgId", i); - sender.send(message, txnSession.getTransactionId()); - } - - // Read all messages from the Queue, do not accept them yet. - AmqpReceiver receiver = session.createReceiver(getTestName()); - ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); - receiver.flow((NUM_MESSAGES + 2) * 2); - for (int i = 0; i < NUM_MESSAGES; ++i) { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - System.out.println("Read message: " + message.getApplicationProperty("msgId")); - assertNotNull(message); - messages.add(message); - } - - // Commit half the consumed messages [0, 1, 2, 3, 4] - txnSession.begin(); - for (int i = 0; i < NUM_MESSAGES / 2; ++i) { - System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId")); - messages.get(i).accept(txnSession, false); - } - txnSession.commit(); + try { - // Rollback the other half the consumed messages [5, 6, 7, 8, 9] - txnSession.begin(); - for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { - System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId")); - messages.get(i).accept(txnSession, false); - } - txnSession.rollback(); + // Root TXN session controls all TXN send lifetimes. + AmqpSession txnSession = connection.createSession(); - // After rollback messages should still be acquired so we read last sent message [10] - { - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - System.out.println("Read message: " + message.getApplicationProperty("msgId")); - assertNotNull(message); - assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); - message.release(); - } - - // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired - txnSession.begin(); - for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { - messages.get(i).accept(txnSession); - } - txnSession.commit(); + // Normal Session which won't create an TXN itself + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getTestName()); - // The final message [10] should still be pending as we released it previously and committed - // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX - { + for (int i = 0; i < NUM_MESSAGES + 1; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setApplicationProperty("msgId", i); + sender.send(message, txnSession.getTransactionId()); + } + + // Read all messages from the Queue, do not accept them yet. + AmqpReceiver receiver = session.createReceiver(getTestName()); + ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES); + receiver.flow((NUM_MESSAGES + 2) * 2); + for (int i = 0; i < NUM_MESSAGES; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + assertNotNull(message); + messages.add(message); + } + + // Commit half the consumed messages [0, 1, 2, 3, 4] + txnSession.begin(); + for (int i = 0; i < NUM_MESSAGES / 2; ++i) { + System.out.println("Commit: Accepting message: " + messages.get(i).getApplicationProperty("msgId")); + messages.get(i).accept(txnSession, false); + } + txnSession.commit(); + + // Rollback the other half the consumed messages [5, 6, 7, 8, 9] + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + System.out.println("Rollback: Accepting message: " + messages.get(i).getApplicationProperty("msgId")); + messages.get(i).accept(txnSession, false); + } + txnSession.rollback(); + + // After rollback messages should still be acquired so we read last sent message [10] + { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.release(); + } + + // Commit the other half the consumed messages [5, 6, 7, 8, 9] which should still be acquired + txnSession.begin(); + for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) { + messages.get(i).accept(txnSession); + } + txnSession.commit(); + + // The final message [10] should still be pending as we released it previously and committed + // the previously accepted but not settled messages [5, 6, 7, 8, 9] in a new TX + { + receiver.flow(1); + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + assertNotNull(message); + assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); + message.accept(); + } + + // We should have now drained the Queue receiver.flow(1); AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - System.out.println("Read message: " + message.getApplicationProperty("msgId")); - assertNotNull(message); - assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId")); - message.accept(); + if (message != null) { + System.out.println("Read message: " + message.getApplicationProperty("msgId")); + } + assertNull(message); + } finally { + connection.close(); } - - // We should have now drained the Queue - receiver.flow(1); - AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - if (message != null) { - System.out.println("Read message: " + message.getApplicationProperty("msgId")); - } - assertNull(message); - - connection.close(); } @Test(timeout = 60000)