This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new c4238e1 ARTEMIS-2327 Removing Bridge Test after fix c4238e1 is described below commit c4238e154fba199060e9870d0ddc9b667eda5179 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Mon May 6 09:52:07 2019 -0400 ARTEMIS-2327 Removing Bridge Test after fix This test was playing with an ignore packet, which does not make any more sense after the last change. After a packet loss the bridge will reconnect, and this test makes no more sense. --- .../integration/cluster/bridge/BridgeTest.java | 168 +-------------------- 1 file changed, 3 insertions(+), 165 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index c46fd01..e9d8a21 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -62,25 +61,22 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordId import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache; import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl; import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendContinuationMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.Bridge; -import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer; -import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; +import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer; +import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.RandomUtil; @@ -318,7 +314,6 @@ public class BridgeTest extends ActiveMQTestBase { ClientSession session1 = sf1.createSession(true, true, 0); ClientConsumer consumer1 = session1.createConsumer(queueName1); - session1.start(); final byte[] bytes = new byte[messageSize]; @@ -351,7 +346,7 @@ public class BridgeTest extends ActiveMQTestBase { } session1.commit(); - BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1"); + BridgeImpl bridge = (BridgeImpl) server0.getClusterManager().getBridges().get("bridge1"); // stop in the middle. wait the bridge to block Wait.assertTrue("bridge is never blocked", bridge::isBlockedOnFlowControl); @@ -379,7 +374,6 @@ public class BridgeTest extends ActiveMQTestBase { message.acknowledge(); } - Wait.assertEquals(0, server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount); Assert.assertNull(consumer1.receiveImmediate()); @@ -514,160 +508,6 @@ public class BridgeTest extends ActiveMQTestBase { } } - @Test - public void testLostMessageSimpleMessage() throws Exception { - internalTestMessageLoss(false); - } - - @Test - public void testLostMessageLargeMessage() throws Exception { - internalTestMessageLoss(true); - } - - /** - * This test will ignore messages - * What will cause the bridge to fail with a timeout - * The bridge should still recover the failure and reconnect on that case - */ - public void internalTestMessageLoss(final boolean largeMessage) throws Exception { - class MyInterceptor implements Interceptor { - - public boolean ignoreSends = true; - public CountDownLatch latch; - - MyInterceptor(int numberOfIgnores) { - latch = new CountDownLatch(numberOfIgnores); - } - - @Override - public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { - if (ignoreSends && packet instanceof SessionSendMessage || - ignoreSends && packet instanceof SessionSendLargeMessage || - ignoreSends && packet instanceof SessionSendContinuationMessage && !((SessionSendContinuationMessage) packet).isContinues()) { - IntegrationTestLogger.LOGGER.info("IGNORED: " + packet); - latch.countDown(); - return false; - } else { - IntegrationTestLogger.LOGGER.info(packet); - return true; - } - } - - } - - MyInterceptor myInterceptor = new MyInterceptor(3); - - Map<String, Object> server0Params = new HashMap<>(); - server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params); - - Map<String, Object> server1Params = new HashMap<>(); - addTargetParameters(server1Params); - server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params); - - final String testAddress = "testAddress"; - final String queueName0 = "queue0"; - final String forwardAddress = "forwardAddress"; - final String queueName1 = "queue1"; - - TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params); - TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params); - - HashMap<String, TransportConfiguration> connectors = new HashMap<>(); - connectors.put(server1tc.getName(), server1tc); - server0.getConfiguration().setConnectorConfigurations(connectors); - - final int messageSize = 1024; - - final int numMessages = 1; - - ArrayList<String> connectorConfig = new ArrayList<>(); - connectorConfig.add(server1tc.getName()); - BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttempts(-1).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setCallTimeout(500); - - List<BridgeConfiguration> bridgeConfigs = new ArrayList<>(); - bridgeConfigs.add(bridgeConfiguration); - server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); - - CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0); - List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>(); - queueConfigs0.add(queueConfig0); - server0.getConfiguration().setQueueConfigurations(queueConfigs0); - - CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1); - List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>(); - queueConfigs1.add(queueConfig1); - server1.getConfiguration().setQueueConfigurations(queueConfigs1); - - server1.start(); - - server1.getRemotingService().addIncomingInterceptor(myInterceptor); - - server0.start(); - locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc)); - ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc)); - - ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc)); - - ClientSession session0 = sf0.createSession(false, true, true); - - ClientSession session1 = sf1.createSession(false, true, true); - - ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress)); - - ClientConsumer consumer1 = session1.createConsumer(queueName1); - - session1.start(); - - final byte[] bytes = new byte[messageSize]; - - final SimpleString propKey = new SimpleString("testkey"); - - for (int i = 0; i < numMessages; i++) { - ClientMessage message = session0.createMessage(true); - - if (largeMessage) { - message.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(10 * 1024)); - } - - message.putIntProperty(propKey, i); - - message.getBodyBuffer().writeBytes(bytes); - - producer0.send(message); - } - - assertTrue("where is the countDown?", myInterceptor.latch.await(30, TimeUnit.SECONDS)); - myInterceptor.ignoreSends = false; - server1.getRemotingService().removeIncomingInterceptor(myInterceptor); - IntegrationTestLogger.LOGGER.info("No longer ignoring packets."); - - for (int i = 0; i < numMessages; i++) { - ClientMessage message = consumer1.receive(30000); - - Assert.assertNotNull(message); - - Assert.assertEquals(i, message.getObjectProperty(propKey)); - - if (largeMessage) { - readLargeMessages(message, 10); - } - - message.acknowledge(); - } - - Assert.assertNull(consumer1.receiveImmediate()); - - session0.close(); - - session1.close(); - - sf0.close(); - - sf1.close(); - closeFields(); - assertEquals("there should be no queues", 0, loadQueues(server0).size()); - } - /** * @param server1Params */ @@ -1216,7 +1056,6 @@ public class BridgeTest extends ActiveMQTestBase { final String propKey = "bridged"; final String propValue = "true"; - TransformerConfiguration transformerConfiguration = new TransformerConfiguration(AddHeadersTransformer.class.getName()); transformerConfiguration.getProperties().put(propKey, propValue); @@ -1278,7 +1117,6 @@ public class BridgeTest extends ActiveMQTestBase { final int numMessages = 10; - for (int i = 0; i < numMessages; i++) { ClientMessage message = session0.createMessage(true);