This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new a1db72395c ARTEMIS-1691 JMS bridge can't be manually restarted after 
failure
a1db72395c is described below

commit a1db72395c055069439870ca147ebde6a0d16f44
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Fri Apr 12 15:32:17 2024 -0500

    ARTEMIS-1691 JMS bridge can't be manually restarted after failure
---
 .../artemis/jms/bridge/impl/JMSBridgeImpl.java     |  19 ++-
 .../integration/jms/bridge/JMSBridgeImplTest.java  | 159 +++++++++++++++++++++
 2 files changed, 175 insertions(+), 3 deletions(-)

diff --git 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
index e5e79ab282..d35973ac4f 100644
--- 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
+++ 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
@@ -406,6 +406,7 @@ public final class JMSBridgeImpl implements JMSBridge {
       if (ok) {
          connectedSource = true;
          connectedTarget = true;
+         failed = false;
          startSource();
       } else {
          ActiveMQJMSBridgeLogger.LOGGER.errorStartingBridge(bridgeName);
@@ -797,7 +798,7 @@ public final class JMSBridgeImpl implements JMSBridge {
    @Override
    public synchronized void setMaxRetries(final int retries) {
       checkBridgeNotStarted();
-      JMSBridgeImpl.checkValidValue(retries, "MaxRetries");
+      JMSBridgeImpl.checkValidValue(retries, "MaxRetries", true);
 
       maxRetries = retries;
    }
@@ -921,7 +922,7 @@ public final class JMSBridgeImpl implements JMSBridge {
       checkNotNull(sourceDestinationFactory, "sourceDestinationFactory");
       checkNotNull(targetDestinationFactory, "targetDestinationFactory");
       checkValidValue(failureRetryInterval, "failureRetryInterval");
-      checkValidValue(maxRetries, "maxRetries");
+      checkValidValue(maxRetries, "maxRetries", true);
       if (failureRetryInterval == -1 && maxRetries > 0) {
          throw new IllegalArgumentException("If failureRetryInterval == -1 
maxRetries must be set to -1");
       }
@@ -953,11 +954,23 @@ public final class JMSBridgeImpl implements JMSBridge {
    }
 
    /**
-    * Check that value is either equals to -1 or greater than 0
+    * Check that value is either equals to -1 or > 0
     *
     * @throws IllegalArgumentException if the value is not valid
     */
    private static void checkValidValue(final long value, final String name) {
+      checkValidValue(value, name, false);
+   }
+
+   /**
+    * Check that value is either equals to -1 or >= 0
+    *
+    * @throws IllegalArgumentException if the value is not valid
+    */
+   private static void checkValidValue(final long value, final String name, 
boolean allowZero) {
+      if (value == 0 && allowZero) {
+         return;
+      }
       if (!(value == -1 || value > 0)) {
          throw new IllegalArgumentException(name + " must be > 0 or -1");
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/bridge/JMSBridgeImplTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/bridge/JMSBridgeImplTest.java
new file mode 100644
index 0000000000..0a20b2c1eb
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/bridge/JMSBridgeImplTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.bridge;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.transaction.TransactionManager;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import 
org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
+import org.apache.activemq.artemis.jms.bridge.DestinationFactory;
+import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
+import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class JMSBridgeImplTest extends ActiveMQTestBase {
+
+   private static final String SOURCE = RandomUtil.randomString();
+
+   private static final String TARGET = RandomUtil.randomString();
+
+   private ActiveMQServer server;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(false, createDefaultInVMConfig());
+      server.start();
+
+      server.createQueue(new 
QueueConfiguration(SOURCE).setRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(TARGET).setRoutingType(RoutingType.ANYCAST));
+   }
+
+   private static ConnectionFactory createConnectionFactory() {
+      ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory) 
ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new 
TransportConfiguration(InVMConnectorFactory.class.getName()));
+      cf.setReconnectAttempts(0);
+      cf.setBlockOnNonDurableSend(true);
+      cf.setBlockOnDurableSend(true);
+      return cf;
+   }
+
+   @Test
+   public void testExceptionOnSourceAndManualRestartSucceeds() throws 
Exception {
+      final AtomicReference<Connection> sourceConn = new AtomicReference<>();
+      ActiveMQJMSConnectionFactory failingSourceCF = new 
ActiveMQJMSConnectionFactory(false, new 
TransportConfiguration(InVMConnectorFactory.class.getName())) {
+         private static final long serialVersionUID = -8866390811966688830L;
+
+         @Override
+         public Connection createConnection() throws JMSException {
+            sourceConn.set(super.createConnection());
+            return sourceConn.get();
+         }
+      };
+      // Note! We disable automatic reconnection on the session factory. The 
bridge needs to do the reconnection
+      failingSourceCF.setReconnectAttempts(0);
+      failingSourceCF.setBlockOnNonDurableSend(true);
+      failingSourceCF.setBlockOnDurableSend(true);
+
+      ConnectionFactoryFactory sourceCFF = () -> failingSourceCF;
+      ConnectionFactoryFactory targetCFF = () -> createConnectionFactory();
+      DestinationFactory sourceDF = () -> 
ActiveMQJMSClient.createQueue(JMSBridgeImplTest.SOURCE);
+      DestinationFactory targetDF = () -> 
ActiveMQJMSClient.createQueue(JMSBridgeImplTest.TARGET);
+      TransactionManager tm = Mockito.mock(TransactionManager.class);
+
+      JMSBridgeImpl bridge = new JMSBridgeImpl();
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(1);
+      bridge.setMaxRetries(0);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      Assert.assertFalse(bridge.isStarted());
+      bridge.start();
+      Assert.assertTrue(bridge.isStarted());
+
+      // make sure the bridge is actually working first
+      Connection targetConn = createConnectionFactory().createConnection();
+      Session targetSess = targetConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = 
targetSess.createConsumer(targetDF.createDestination());
+      final List<Message> messages = new LinkedList<>();
+      MessageListener listener = message -> messages.add(message);
+      consumer.setMessageListener(listener);
+      targetConn.start();
+
+      Session sourceSess = sourceConn.get().createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = 
sourceSess.createProducer(sourceDF.createDestination());
+      producer.send(sourceSess.createTextMessage());
+
+      Wait.assertEquals(1, () -> messages.size(), 2000, 100);
+
+      sourceConn.get().getExceptionListener().onException(new 
JMSException("exception on the source"));
+      Wait.assertTrue(() -> bridge.isFailed(), 2000, 50);
+      targetConn.close();
+      bridge.stop();
+      Assert.assertFalse(bridge.isStarted());
+      bridge.start();
+      Assert.assertTrue(bridge.isStarted());
+
+      // test the bridge again after it's been restarted to ensure it's working
+      targetConn = 
JMSBridgeImplTest.createConnectionFactory().createConnection();
+      targetSess = targetConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      consumer = targetSess.createConsumer(targetDF.createDestination());
+      messages.clear();
+      consumer.setMessageListener(listener);
+      targetConn.start();
+
+      sourceSess = sourceConn.get().createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      producer = sourceSess.createProducer(sourceDF.createDestination());
+      producer.send(sourceSess.createTextMessage());
+
+      Wait.assertEquals(1, () -> messages.size(), 2000, 100);
+      targetConn.close();
+      sourceConn.get().close();
+      bridge.stop();
+      Assert.assertFalse(bridge.isStarted());
+   }
+}

Reply via email to