This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new dc1cfa3  ARTEMIS-2290 JMSBridgeImpl::stop is failing when called from 
FailureHandler
     new b62e081  This closes #2598
dc1cfa3 is described below

commit dc1cfa3536087d8dd29b5462775e8a95809ed883
Author: Francesco Nigro <nigro....@gmail.com>
AuthorDate: Tue Mar 19 12:10:07 2019 +0100

    ARTEMIS-2290 JMSBridgeImpl::stop is failing when called from FailureHandler
---
 .../artemis/jms/bridge/impl/JMSBridgeImpl.java     | 428 +++++++++++++--------
 1 file changed, 272 insertions(+), 156 deletions(-)

diff --git 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
index a5e1c0e..2da550b 100644
--- 
a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
+++ 
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java
@@ -45,13 +45,16 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.ServiceLoader;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
 import org.apache.activemq.artemis.api.core.client.FailoverEventType;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
@@ -113,8 +116,8 @@ public final class JMSBridgeImpl implements JMSBridge {
 
    private boolean started;
 
-   private final Object stoppingGuard = new Object();
-   private boolean stopping = false;
+   private static final Object stoppingGuard = new Object();
+   private volatile boolean stopping = false;
 
    private final LinkedList<Message> messages;
 
@@ -142,7 +145,13 @@ public final class JMSBridgeImpl implements JMSBridge {
 
    private MessageProducer targetProducer;
 
-   private BatchTimeChecker timeChecker;
+   private CountDownLatch batchTimeCheckerFinished;
+
+   private Future<?> batchTimeCheckerTask;
+
+   private CountDownLatch sourceReceiverFinished;
+
+   private Future<?> sourceReceiverTask;
 
    private ExecutorService executor;
 
@@ -418,17 +427,25 @@ public final class JMSBridgeImpl implements JMSBridge {
             ActiveMQJMSBridgeLogger.LOGGER.trace("Starting time checker 
thread");
          }
 
-         timeChecker = new BatchTimeChecker();
-
-         executor.execute(timeChecker);
          batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
 
+         batchTimeCheckerFinished = new CountDownLatch(1);
+
+         batchTimeCheckerTask = executor.submit(new BatchTimeChecker());
+
          if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
             ActiveMQJMSBridgeLogger.LOGGER.trace("Started time checker 
thread");
          }
+      } else {
+
+         batchTimeCheckerFinished = null;
+
+         batchTimeCheckerTask = null;
       }
 
-      executor.execute(new SourceReceiver());
+      sourceReceiverFinished = new CountDownLatch(1);
+
+      sourceReceiverTask = executor.submit(new SourceReceiver());
 
       if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
          ActiveMQJMSBridgeLogger.LOGGER.trace("Started " + this);
@@ -451,6 +468,43 @@ public final class JMSBridgeImpl implements JMSBridge {
 
    @Override
    public void stop() throws Exception {
+      stop(false);
+   }
+
+   private boolean awaitTaskCompletion(CountDownLatch finished, long time, 
TimeUnit timeUnit, String taskName) {
+      boolean taskCompleted;
+      try {
+         taskCompleted = finished.await(time, timeUnit);
+         if (!taskCompleted) {
+            ActiveMQJMSBridgeLogger.LOGGER.tracef("%s task on bridge %s wasn't 
able to finish", taskName, bridgeName);
+         }
+         return taskCompleted;
+      } catch (InterruptedException ie) {
+         ActiveMQJMSBridgeLogger.LOGGER.tracef("An interruption has happened 
on bridge %s while waiting %s task to finish", bridgeName, taskName);
+         return false;
+      }
+   }
+
+   private boolean awaitAll(long time, TimeUnit timeUnit, Pair<String, 
CountDownLatch>... namedTaskCompletions) {
+      long remainingNanos = timeUnit.toNanos(time);
+      boolean allFinished = true;
+      for (Pair<String, CountDownLatch> namedTaskCompletion : 
namedTaskCompletions) {
+         final CountDownLatch taskCompletion = namedTaskCompletion.getB();
+         if (taskCompletion != null) {
+            final String taskName = namedTaskCompletion.getA();
+            final long start = System.nanoTime();
+            final boolean taskCompleted = awaitTaskCompletion(taskCompletion, 
remainingNanos, TimeUnit.NANOSECONDS, taskName);
+            final long elapsed = System.nanoTime() - start;
+            if (!taskCompleted) {
+               allFinished = false;
+            }
+            remainingNanos = Math.max(0, remainingNanos - elapsed);
+         }
+      }
+      return allFinished;
+   }
+
+   private void stop(boolean isFailureHandler) throws Exception {
       synchronized (stoppingGuard) {
          if (stopping)
             return;
@@ -461,74 +515,119 @@ public final class JMSBridgeImpl implements JMSBridge {
          if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
             ActiveMQJMSBridgeLogger.LOGGER.trace("Stopping " + this);
          }
+         Connection sourceConn = this.sourceConn;
          if (!connectedSource && sourceConn != null) {
-            sourceConn.close();
+            try {
+               sourceConn.close();
+            } catch (Throwable t) {
+               ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close source 
connection on bridge %s", t);
+            } finally {
+               sourceConn = null;
+            }
          }
+         Connection targetConn = this.targetConn;
          if (!connectedTarget && targetConn != null) {
-            targetConn.close();
+            try {
+               targetConn.close();
+            } catch (Throwable t) {
+               ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close target 
connection on bridge %s", t);
+            } finally {
+               targetConn = null;
+            }
          }
+         final CountDownLatch sourceReceiverFinished = 
this.sourceReceiverFinished;
+         final Future<?> sourceReceiverTask = this.sourceReceiverTask;
+         final CountDownLatch batchTimeCheckerFinished = 
this.batchTimeCheckerFinished;
+         final Future<?> batchTimeCheckerTask = this.batchTimeCheckerTask;
+         this.sourceReceiverFinished = null;
+         this.sourceReceiverTask = null;
+         this.batchTimeCheckerFinished = null;
+         this.batchTimeCheckerTask = null;
          synchronized (lock) {
             started = false;
-
-            executor.shutdownNow();
+            if (!isFailureHandler) {
+               executor.shutdownNow();
+            } else {
+               if (sourceReceiverTask != null) {
+                  sourceReceiverTask.cancel(true);
+               }
+               if (batchTimeCheckerTask != null) {
+                  batchTimeCheckerTask.cancel(true);
+               }
+            }
          }
 
-         boolean ok = executor.awaitTermination(60, TimeUnit.SECONDS);
-
-         if (!ok) {
-            throw new Exception("fail to stop JMS Bridge");
+         final boolean ok;
+         if (!isFailureHandler) {
+            ok = executor.awaitTermination(60, TimeUnit.SECONDS);
+         } else {
+            ok = awaitAll(60, TimeUnit.SECONDS,
+                          new Pair<>("SourceReceiver", sourceReceiverFinished),
+                          new Pair<>("BatchTimeChecker", 
batchTimeCheckerFinished));
          }
 
-         if (tx != null) {
-            // Terminate any transaction
-            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining 
tx");
-            }
+         try {
 
-            stopSessionFailover();
+            if (!ok) {
+               throw new Exception("the bridge hasn't cleanly stopped: 
transactions, connections or messages could have leaked!");
+            }
 
-            try {
-               tx.rollback();
-               abortedMessageCount += messages.size();
-            } catch (Exception ignore) {
+            if (tx != null) {
+               // Terminate any transaction
                if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                  ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to rollback", 
ignore);
+                  ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining 
tx");
+               }
+
+               try {
+                  stopSessionFailover();
+
+                  try {
+                     tx.rollback();
+                     abortedMessageCount += messages.size();
+                  } catch (Exception ignore) {
+                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                        ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to 
rollback", ignore);
+                     }
+                  }
+
+                  if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                     ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back 
remaining tx");
+                  }
+               } catch (Throwable t) {
+                  ActiveMQJMSBridgeLogger.LOGGER.trace("Failed 
stopSessionFailover", t);
                }
-            }
 
-            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back remaining 
tx");
             }
-         }
 
-         if (sourceConn != null) {
-            try {
-               sourceConn.close();
-            } catch (Exception ignore) {
-               if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                  ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source 
conn", ignore);
+            if (sourceConn != null) {
+               try {
+                  sourceConn.close();
+               } catch (Exception ignore) {
+                  ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close 
source connection on bridge %s", ignore);
                }
             }
-         }
 
-         if (targetConn != null) {
-            try {
-               targetConn.close();
-            } catch (Exception ignore) {
-               if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                  ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close target 
conn", ignore);
+            if (targetConn != null) {
+               try {
+                  targetConn.close();
+               } catch (Exception ignore) {
+                  ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close 
target connection on bridge %s", ignore);
                }
             }
-         }
 
-         if (messages.size() > 0) {
-            // Clear outstanding messages so they don't get retransmitted and 
duplicated on the other side of the bridge
-            ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before 
stopping...");
-            messages.clear();
-         }
+            if (messages.size() > 0) {
+               // Clear outstanding messages so they don't get retransmitted 
and duplicated on the other side of the bridge
+               ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages 
before stopping...");
+               messages.clear();
+            }
 
-         if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-            ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
+            if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+               ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
+            }
+         } finally {
+            if (isFailureHandler) {
+               executor.shutdownNow();
+            }
          }
       }
    }
@@ -1269,13 +1368,16 @@ public final class JMSBridgeImpl implements JMSBridge {
       }
    }
 
-   private void pause(final long interval) {
-      long start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < failureRetryInterval) {
-         try {
-            Thread.sleep(failureRetryInterval);
-         } catch (InterruptedException ex) {
-         }
+   /**
+    * Pause the calling thread for the given {@code millis}: it returns {@code 
true} if not interrupted, {@code false} otherwise.
+    */
+   private static boolean pause(final long millis) {
+      assert millis >= 0;
+      try {
+         Thread.sleep(millis);
+         return true;
+      } catch (InterruptedException ex) {
+         return false;
       }
    }
 
@@ -1286,7 +1388,7 @@ public final class JMSBridgeImpl implements JMSBridge {
 
       int count = 0;
 
-      while (true && !stopping) {
+      while (!stopping) {
          boolean ok = setupJMSObjects();
 
          if (ok) {
@@ -1301,7 +1403,10 @@ public final class JMSBridgeImpl implements JMSBridge {
 
          
ActiveMQJMSBridgeLogger.LOGGER.failedToSetUpBridge(failureRetryInterval, 
bridgeName);
 
-         pause(failureRetryInterval);
+         if (!pause(failureRetryInterval)) {
+            ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while pausing 
the bridge %s", bridgeName);
+            return false;
+         }
       }
 
       // If we get here then we exceeded maxRetries
@@ -1649,87 +1754,88 @@ public final class JMSBridgeImpl implements JMSBridge {
     * to ensure that message delivery does not happen concurrently with
     * transaction enlistment of the XAResource (see HORNETQ-27)
     */
-   private final class SourceReceiver extends Thread {
-
-      SourceReceiver() {
-         super("jmsbridge-source-receiver-thread");
-      }
+   private final class SourceReceiver implements Runnable {
 
       @Override
       @SuppressWarnings("WaitNotInLoop")
       // both lock.wait(..) either returns, throws or continue, thus avoiding 
spurious wakes
       public void run() {
-         while (started) {
-            if (stopping) {
-               return;
-            }
-            synchronized (lock) {
-               if (paused || failed) {
-                  try {
-                     lock.wait(500);
-                  } catch (InterruptedException e) {
-                     if (stopping) {
-                        return;
-                     }
-                     throw new ActiveMQInterruptedException(e);
-                  }
-                  continue;
+         final CountDownLatch finished = sourceReceiverFinished;
+         try {
+            while (started) {
+               if (stopping) {
+                  return;
                }
-
-               Message msg = null;
-               try {
-                  msg = sourceConsumer.receive(1000);
-
-                  if (msg instanceof ActiveMQMessage) {
-                     // We need to check the buffer mainly in the case of 
LargeMessages
-                     // As we need to reconstruct the buffer before resending 
the message
-                     ((ActiveMQMessage) msg).checkBuffer();
-                  }
-               } catch (JMSException jmse) {
-                  if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                     ActiveMQJMSBridgeLogger.LOGGER.trace(this + " exception 
while receiving a message", jmse);
+               synchronized (lock) {
+                  if (paused || failed) {
+                     try {
+                        lock.wait(500);
+                     } catch (InterruptedException e) {
+                        if (stopping) {
+                           return;
+                        }
+                        throw new ActiveMQInterruptedException(e);
+                     }
+                     continue;
                   }
-               }
 
-               if (msg == null) {
+                  Message msg = null;
                   try {
-                     lock.wait(500);
-                  } catch (InterruptedException e) {
-                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread 
was interrupted");
+                     msg = sourceConsumer.receive(1000);
+
+                     if (msg instanceof ActiveMQMessage) {
+                        // We need to check the buffer mainly in the case of 
LargeMessages
+                        // As we need to reconstruct the buffer before 
resending the message
+                        ((ActiveMQMessage) msg).checkBuffer();
                      }
-                     if (stopping) {
-                        return;
+                  } catch (JMSException jmse) {
+                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
exception while receiving a message", jmse);
                      }
-                     throw new ActiveMQInterruptedException(e);
                   }
-                  continue;
-               }
 
-               if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                  ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received 
message " + msg);
-               }
+                  if (msg == null) {
+                     try {
+                        lock.wait(500);
+                     } catch (InterruptedException e) {
+                        if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                           ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
thread was interrupted");
+                        }
+                        if (stopping) {
+                           return;
+                        }
+                        throw new ActiveMQInterruptedException(e);
+                     }
+                     continue;
+                  }
 
-               messages.add(msg);
+                  if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                     ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received 
message " + msg);
+                  }
 
-               batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+                  messages.add(msg);
 
-               if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                  ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled 
batchExpiryTime to " + batchExpiryTime);
-               }
+                  batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
 
-               if (maxBatchSize != -1 && messages.size() >= maxBatchSize) {
                   if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                     ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
maxBatchSize has been reached so sending batch");
+                     ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled 
batchExpiryTime to " + batchExpiryTime);
                   }
 
-                  sendBatch();
+                  if (maxBatchSize != -1 && messages.size() >= maxBatchSize) {
+                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
maxBatchSize has been reached so sending batch");
+                     }
 
-                  if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                     ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent 
batch");
+                     sendBatch();
+
+                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent 
batch");
+                     }
                   }
                }
             }
+         } finally {
+            finished.countDown();
          }
       }
    }
@@ -1764,8 +1870,9 @@ public final class JMSBridgeImpl implements JMSBridge {
          ActiveMQJMSBridgeLogger.LOGGER.errorConnectingBridge(bridgeName);
 
          try {
-            stop();
-         } catch (Exception ignore) {
+            stop(true);
+         } catch (Throwable ignore) {
+            ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to stop bridge %s 
from %s ", bridgeName, this.getClass().getSimpleName(), ignore);
          }
       }
 
@@ -1785,7 +1892,11 @@ public final class JMSBridgeImpl implements JMSBridge {
          if (maxRetries > 0 || maxRetries == -1) {
             ActiveMQJMSBridgeLogger.LOGGER.bridgeRetry(failureRetryInterval, 
bridgeName);
 
-            pause(failureRetryInterval);
+            if (!pause(failureRetryInterval)) {
+               ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while 
pausing the bridge %s", bridgeName);
+               failed();
+               return;
+            }
 
             // Now we try
             ok = setupJMSObjectsWithRetry();
@@ -1839,53 +1950,58 @@ public final class JMSBridgeImpl implements JMSBridge {
             ActiveMQJMSBridgeLogger.LOGGER.trace(this + " running");
          }
 
-         synchronized (lock) {
-            while (started) {
-               long toWait = batchExpiryTime - System.currentTimeMillis();
+         final CountDownLatch completed = batchTimeCheckerFinished;
+         try {
+            synchronized (lock) {
+               while (started) {
+                  long toWait = batchExpiryTime - System.currentTimeMillis();
 
-               if (toWait <= 0) {
-                  if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                     ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited 
enough");
-                  }
+                  if (toWait <= 0) {
+                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited 
enough");
+                     }
 
-                  synchronized (lock) {
-                     if (!failed && !messages.isEmpty()) {
-                        if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                           ActiveMQJMSBridgeLogger.LOGGER.trace(this + " got 
some messages so sending batch");
-                        }
+                     synchronized (lock) {
+                        if (!failed && !messages.isEmpty()) {
+                           if 
(ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                              ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
got some messages so sending batch");
+                           }
 
-                        sendBatch();
+                           sendBatch();
 
-                        if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                           ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent 
batch");
+                           if 
(ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                              ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
sent batch");
+                           }
                         }
                      }
-                  }
 
-                  batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-               } else {
-                  try {
-                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waiting 
for " + toWait);
-                     }
+                     batchExpiryTime = System.currentTimeMillis() + 
maxBatchTime;
+                  } else {
+                     try {
+                        if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                           ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
waiting for " + toWait);
+                        }
 
-                     lock.wait(toWait);
+                        lock.wait(toWait);
 
-                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke 
up");
-                     }
-                  } catch (InterruptedException e) {
-                     if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
-                        ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread 
was interrupted");
-                     }
-                     if (stopping) {
-                        return;
+                        if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                           ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke 
up");
+                        }
+                     } catch (InterruptedException e) {
+                        if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
+                           ActiveMQJMSBridgeLogger.LOGGER.trace(this + " 
thread was interrupted");
+                        }
+                        if (stopping) {
+                           return;
+                        }
+                        throw new ActiveMQInterruptedException(e);
                      }
-                     throw new ActiveMQInterruptedException(e);
-                  }
 
+                  }
                }
             }
+         } finally {
+            completed.countDown();
          }
       }
    }
@@ -2037,8 +2153,8 @@ public final class JMSBridgeImpl implements JMSBridge {
          }
 
          /*
-         * make sure we reset the connected flags
-         * */
+          * make sure we reset the connected flags
+          * */
          if (result == FailoverEventType.FAILOVER_COMPLETED) {
             if (isSource) {
                connectedSource = true;

Reply via email to