This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new a1db72395c ARTEMIS-1691 JMS bridge can't be manually restarted after failure a1db72395c is described below commit a1db72395c055069439870ca147ebde6a0d16f44 Author: Justin Bertram <jbert...@apache.org> AuthorDate: Fri Apr 12 15:32:17 2024 -0500 ARTEMIS-1691 JMS bridge can't be manually restarted after failure --- .../artemis/jms/bridge/impl/JMSBridgeImpl.java | 19 ++- .../integration/jms/bridge/JMSBridgeImplTest.java | 159 +++++++++++++++++++++ 2 files changed, 175 insertions(+), 3 deletions(-) diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java index e5e79ab282..d35973ac4f 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java @@ -406,6 +406,7 @@ public final class JMSBridgeImpl implements JMSBridge { if (ok) { connectedSource = true; connectedTarget = true; + failed = false; startSource(); } else { ActiveMQJMSBridgeLogger.LOGGER.errorStartingBridge(bridgeName); @@ -797,7 +798,7 @@ public final class JMSBridgeImpl implements JMSBridge { @Override public synchronized void setMaxRetries(final int retries) { checkBridgeNotStarted(); - JMSBridgeImpl.checkValidValue(retries, "MaxRetries"); + JMSBridgeImpl.checkValidValue(retries, "MaxRetries", true); maxRetries = retries; } @@ -921,7 +922,7 @@ public final class JMSBridgeImpl implements JMSBridge { checkNotNull(sourceDestinationFactory, "sourceDestinationFactory"); checkNotNull(targetDestinationFactory, "targetDestinationFactory"); checkValidValue(failureRetryInterval, "failureRetryInterval"); - checkValidValue(maxRetries, "maxRetries"); + checkValidValue(maxRetries, "maxRetries", true); if (failureRetryInterval == -1 && maxRetries > 0) { throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be set to -1"); } @@ -953,11 +954,23 @@ public final class JMSBridgeImpl implements JMSBridge { } /** - * Check that value is either equals to -1 or greater than 0 + * Check that value is either equals to -1 or > 0 * * @throws IllegalArgumentException if the value is not valid */ private static void checkValidValue(final long value, final String name) { + checkValidValue(value, name, false); + } + + /** + * Check that value is either equals to -1 or >= 0 + * + * @throws IllegalArgumentException if the value is not valid + */ + private static void checkValidValue(final long value, final String name, boolean allowZero) { + if (value == 0 && allowZero) { + return; + } if (!(value == -1 || value > 0)) { throw new IllegalArgumentException(name + " must be > 0 or -1"); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/bridge/JMSBridgeImplTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/bridge/JMSBridgeImplTest.java new file mode 100644 index 0000000000..0a20b2c1eb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/bridge/JMSBridgeImplTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.bridge; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.transaction.TransactionManager; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; +import org.apache.activemq.artemis.jms.bridge.DestinationFactory; +import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; +import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class JMSBridgeImplTest extends ActiveMQTestBase { + + private static final String SOURCE = RandomUtil.randomString(); + + private static final String TARGET = RandomUtil.randomString(); + + private ActiveMQServer server; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(false, createDefaultInVMConfig()); + server.start(); + + server.createQueue(new QueueConfiguration(SOURCE).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(TARGET).setRoutingType(RoutingType.ANYCAST)); + } + + private static ConnectionFactory createConnectionFactory() { + ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(InVMConnectorFactory.class.getName())); + cf.setReconnectAttempts(0); + cf.setBlockOnNonDurableSend(true); + cf.setBlockOnDurableSend(true); + return cf; + } + + @Test + public void testExceptionOnSourceAndManualRestartSucceeds() throws Exception { + final AtomicReference<Connection> sourceConn = new AtomicReference<>(); + ActiveMQJMSConnectionFactory failingSourceCF = new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName())) { + private static final long serialVersionUID = -8866390811966688830L; + + @Override + public Connection createConnection() throws JMSException { + sourceConn.set(super.createConnection()); + return sourceConn.get(); + } + }; + // Note! We disable automatic reconnection on the session factory. The bridge needs to do the reconnection + failingSourceCF.setReconnectAttempts(0); + failingSourceCF.setBlockOnNonDurableSend(true); + failingSourceCF.setBlockOnDurableSend(true); + + ConnectionFactoryFactory sourceCFF = () -> failingSourceCF; + ConnectionFactoryFactory targetCFF = () -> createConnectionFactory(); + DestinationFactory sourceDF = () -> ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE); + DestinationFactory targetDF = () -> ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET); + TransactionManager tm = Mockito.mock(TransactionManager.class); + + JMSBridgeImpl bridge = new JMSBridgeImpl(); + bridge.setSourceConnectionFactoryFactory(sourceCFF); + bridge.setSourceDestinationFactory(sourceDF); + bridge.setTargetConnectionFactoryFactory(targetCFF); + bridge.setTargetDestinationFactory(targetDF); + bridge.setFailureRetryInterval(1); + bridge.setMaxRetries(0); + bridge.setMaxBatchSize(1); + bridge.setMaxBatchTime(-1); + bridge.setTransactionManager(tm); + bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE); + + Assert.assertFalse(bridge.isStarted()); + bridge.start(); + Assert.assertTrue(bridge.isStarted()); + + // make sure the bridge is actually working first + Connection targetConn = createConnectionFactory().createConnection(); + Session targetSess = targetConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = targetSess.createConsumer(targetDF.createDestination()); + final List<Message> messages = new LinkedList<>(); + MessageListener listener = message -> messages.add(message); + consumer.setMessageListener(listener); + targetConn.start(); + + Session sourceSess = sourceConn.get().createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sourceSess.createProducer(sourceDF.createDestination()); + producer.send(sourceSess.createTextMessage()); + + Wait.assertEquals(1, () -> messages.size(), 2000, 100); + + sourceConn.get().getExceptionListener().onException(new JMSException("exception on the source")); + Wait.assertTrue(() -> bridge.isFailed(), 2000, 50); + targetConn.close(); + bridge.stop(); + Assert.assertFalse(bridge.isStarted()); + bridge.start(); + Assert.assertTrue(bridge.isStarted()); + + // test the bridge again after it's been restarted to ensure it's working + targetConn = JMSBridgeImplTest.createConnectionFactory().createConnection(); + targetSess = targetConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = targetSess.createConsumer(targetDF.createDestination()); + messages.clear(); + consumer.setMessageListener(listener); + targetConn.start(); + + sourceSess = sourceConn.get().createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = sourceSess.createProducer(sourceDF.createDestination()); + producer.send(sourceSess.createTextMessage()); + + Wait.assertEquals(1, () -> messages.size(), 2000, 100); + targetConn.close(); + sourceConn.get().close(); + bridge.stop(); + Assert.assertFalse(bridge.isStarted()); + } +}