This is an automated email from the ASF dual-hosted git repository. jbertram pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new dc1cfa3 ARTEMIS-2290 JMSBridgeImpl::stop is failing when called from FailureHandler new b62e081 This closes #2598 dc1cfa3 is described below commit dc1cfa3536087d8dd29b5462775e8a95809ed883 Author: Francesco Nigro <nigro....@gmail.com> AuthorDate: Tue Mar 19 12:10:07 2019 +0100 ARTEMIS-2290 JMSBridgeImpl::stop is failing when called from FailureHandler --- .../artemis/jms/bridge/impl/JMSBridgeImpl.java | 428 +++++++++++++-------- 1 file changed, 272 insertions(+), 156 deletions(-) diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java index a5e1c0e..2da550b 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java @@ -45,13 +45,16 @@ import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; import java.util.ServiceLoader; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.client.FailoverEventListener; import org.apache.activemq.artemis.api.core.client.FailoverEventType; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; @@ -113,8 +116,8 @@ public final class JMSBridgeImpl implements JMSBridge { private boolean started; - private final Object stoppingGuard = new Object(); - private boolean stopping = false; + private static final Object stoppingGuard = new Object(); + private volatile boolean stopping = false; private final LinkedList<Message> messages; @@ -142,7 +145,13 @@ public final class JMSBridgeImpl implements JMSBridge { private MessageProducer targetProducer; - private BatchTimeChecker timeChecker; + private CountDownLatch batchTimeCheckerFinished; + + private Future<?> batchTimeCheckerTask; + + private CountDownLatch sourceReceiverFinished; + + private Future<?> sourceReceiverTask; private ExecutorService executor; @@ -418,17 +427,25 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.trace("Starting time checker thread"); } - timeChecker = new BatchTimeChecker(); - - executor.execute(timeChecker); batchExpiryTime = System.currentTimeMillis() + maxBatchTime; + batchTimeCheckerFinished = new CountDownLatch(1); + + batchTimeCheckerTask = executor.submit(new BatchTimeChecker()); + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Started time checker thread"); } + } else { + + batchTimeCheckerFinished = null; + + batchTimeCheckerTask = null; } - executor.execute(new SourceReceiver()); + sourceReceiverFinished = new CountDownLatch(1); + + sourceReceiverTask = executor.submit(new SourceReceiver()); if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Started " + this); @@ -451,6 +468,43 @@ public final class JMSBridgeImpl implements JMSBridge { @Override public void stop() throws Exception { + stop(false); + } + + private boolean awaitTaskCompletion(CountDownLatch finished, long time, TimeUnit timeUnit, String taskName) { + boolean taskCompleted; + try { + taskCompleted = finished.await(time, timeUnit); + if (!taskCompleted) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("%s task on bridge %s wasn't able to finish", taskName, bridgeName); + } + return taskCompleted; + } catch (InterruptedException ie) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("An interruption has happened on bridge %s while waiting %s task to finish", bridgeName, taskName); + return false; + } + } + + private boolean awaitAll(long time, TimeUnit timeUnit, Pair<String, CountDownLatch>... namedTaskCompletions) { + long remainingNanos = timeUnit.toNanos(time); + boolean allFinished = true; + for (Pair<String, CountDownLatch> namedTaskCompletion : namedTaskCompletions) { + final CountDownLatch taskCompletion = namedTaskCompletion.getB(); + if (taskCompletion != null) { + final String taskName = namedTaskCompletion.getA(); + final long start = System.nanoTime(); + final boolean taskCompleted = awaitTaskCompletion(taskCompletion, remainingNanos, TimeUnit.NANOSECONDS, taskName); + final long elapsed = System.nanoTime() - start; + if (!taskCompleted) { + allFinished = false; + } + remainingNanos = Math.max(0, remainingNanos - elapsed); + } + } + return allFinished; + } + + private void stop(boolean isFailureHandler) throws Exception { synchronized (stoppingGuard) { if (stopping) return; @@ -461,74 +515,119 @@ public final class JMSBridgeImpl implements JMSBridge { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Stopping " + this); } + Connection sourceConn = this.sourceConn; if (!connectedSource && sourceConn != null) { - sourceConn.close(); + try { + sourceConn.close(); + } catch (Throwable t) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close source connection on bridge %s", t); + } finally { + sourceConn = null; + } } + Connection targetConn = this.targetConn; if (!connectedTarget && targetConn != null) { - targetConn.close(); + try { + targetConn.close(); + } catch (Throwable t) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close target connection on bridge %s", t); + } finally { + targetConn = null; + } } + final CountDownLatch sourceReceiverFinished = this.sourceReceiverFinished; + final Future<?> sourceReceiverTask = this.sourceReceiverTask; + final CountDownLatch batchTimeCheckerFinished = this.batchTimeCheckerFinished; + final Future<?> batchTimeCheckerTask = this.batchTimeCheckerTask; + this.sourceReceiverFinished = null; + this.sourceReceiverTask = null; + this.batchTimeCheckerFinished = null; + this.batchTimeCheckerTask = null; synchronized (lock) { started = false; - - executor.shutdownNow(); + if (!isFailureHandler) { + executor.shutdownNow(); + } else { + if (sourceReceiverTask != null) { + sourceReceiverTask.cancel(true); + } + if (batchTimeCheckerTask != null) { + batchTimeCheckerTask.cancel(true); + } + } } - boolean ok = executor.awaitTermination(60, TimeUnit.SECONDS); - - if (!ok) { - throw new Exception("fail to stop JMS Bridge"); + final boolean ok; + if (!isFailureHandler) { + ok = executor.awaitTermination(60, TimeUnit.SECONDS); + } else { + ok = awaitAll(60, TimeUnit.SECONDS, + new Pair<>("SourceReceiver", sourceReceiverFinished), + new Pair<>("BatchTimeChecker", batchTimeCheckerFinished)); } - if (tx != null) { - // Terminate any transaction - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); - } + try { - stopSessionFailover(); + if (!ok) { + throw new Exception("the bridge hasn't cleanly stopped: transactions, connections or messages could have leaked!"); + } - try { - tx.rollback(); - abortedMessageCount += messages.size(); - } catch (Exception ignore) { + if (tx != null) { + // Terminate any transaction if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore); + ActiveMQJMSBridgeLogger.LOGGER.trace("Rolling back remaining tx"); + } + + try { + stopSessionFailover(); + + try { + tx.rollback(); + abortedMessageCount += messages.size(); + } catch (Exception ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to rollback", ignore); + } + } + + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx"); + } + } catch (Throwable t) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed stopSessionFailover", t); } - } - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Rolled back remaining tx"); } - } - if (sourceConn != null) { - try { - sourceConn.close(); - } catch (Exception ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); + if (sourceConn != null) { + try { + sourceConn.close(); + } catch (Exception ignore) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close source connection on bridge %s", ignore); } } - } - if (targetConn != null) { - try { - targetConn.close(); - } catch (Exception ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close target conn", ignore); + if (targetConn != null) { + try { + targetConn.close(); + } catch (Exception ignore) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to close target connection on bridge %s", ignore); } } - } - if (messages.size() > 0) { - // Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge - ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping..."); - messages.clear(); - } + if (messages.size() > 0) { + // Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge + ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping..."); + messages.clear(); + } - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this); + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this); + } + } finally { + if (isFailureHandler) { + executor.shutdownNow(); + } } } } @@ -1269,13 +1368,16 @@ public final class JMSBridgeImpl implements JMSBridge { } } - private void pause(final long interval) { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < failureRetryInterval) { - try { - Thread.sleep(failureRetryInterval); - } catch (InterruptedException ex) { - } + /** + * Pause the calling thread for the given {@code millis}: it returns {@code true} if not interrupted, {@code false} otherwise. + */ + private static boolean pause(final long millis) { + assert millis >= 0; + try { + Thread.sleep(millis); + return true; + } catch (InterruptedException ex) { + return false; } } @@ -1286,7 +1388,7 @@ public final class JMSBridgeImpl implements JMSBridge { int count = 0; - while (true && !stopping) { + while (!stopping) { boolean ok = setupJMSObjects(); if (ok) { @@ -1301,7 +1403,10 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.failedToSetUpBridge(failureRetryInterval, bridgeName); - pause(failureRetryInterval); + if (!pause(failureRetryInterval)) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while pausing the bridge %s", bridgeName); + return false; + } } // If we get here then we exceeded maxRetries @@ -1649,87 +1754,88 @@ public final class JMSBridgeImpl implements JMSBridge { * to ensure that message delivery does not happen concurrently with * transaction enlistment of the XAResource (see HORNETQ-27) */ - private final class SourceReceiver extends Thread { - - SourceReceiver() { - super("jmsbridge-source-receiver-thread"); - } + private final class SourceReceiver implements Runnable { @Override @SuppressWarnings("WaitNotInLoop") // both lock.wait(..) either returns, throws or continue, thus avoiding spurious wakes public void run() { - while (started) { - if (stopping) { - return; - } - synchronized (lock) { - if (paused || failed) { - try { - lock.wait(500); - } catch (InterruptedException e) { - if (stopping) { - return; - } - throw new ActiveMQInterruptedException(e); - } - continue; + final CountDownLatch finished = sourceReceiverFinished; + try { + while (started) { + if (stopping) { + return; } - - Message msg = null; - try { - msg = sourceConsumer.receive(1000); - - if (msg instanceof ActiveMQMessage) { - // We need to check the buffer mainly in the case of LargeMessages - // As we need to reconstruct the buffer before resending the message - ((ActiveMQMessage) msg).checkBuffer(); - } - } catch (JMSException jmse) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " exception while receiving a message", jmse); + synchronized (lock) { + if (paused || failed) { + try { + lock.wait(500); + } catch (InterruptedException e) { + if (stopping) { + return; + } + throw new ActiveMQInterruptedException(e); + } + continue; } - } - if (msg == null) { + Message msg = null; try { - lock.wait(500); - } catch (InterruptedException e) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + msg = sourceConsumer.receive(1000); + + if (msg instanceof ActiveMQMessage) { + // We need to check the buffer mainly in the case of LargeMessages + // As we need to reconstruct the buffer before resending the message + ((ActiveMQMessage) msg).checkBuffer(); } - if (stopping) { - return; + } catch (JMSException jmse) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " exception while receiving a message", jmse); } - throw new ActiveMQInterruptedException(e); } - continue; - } - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received message " + msg); - } + if (msg == null) { + try { + lock.wait(500); + } catch (InterruptedException e) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + } + if (stopping) { + return; + } + throw new ActiveMQInterruptedException(e); + } + continue; + } - messages.add(msg); + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " received message " + msg); + } - batchExpiryTime = System.currentTimeMillis() + maxBatchTime; + messages.add(msg); - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); - } + batchExpiryTime = System.currentTimeMillis() + maxBatchTime; - if (maxBatchSize != -1 && messages.size() >= maxBatchSize) { if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch"); + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); } - sendBatch(); + if (maxBatchSize != -1 && messages.size() >= maxBatchSize) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " maxBatchSize has been reached so sending batch"); + } - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + sendBatch(); + + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + } } } } + } finally { + finished.countDown(); } } } @@ -1764,8 +1870,9 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.errorConnectingBridge(bridgeName); try { - stop(); - } catch (Exception ignore) { + stop(true); + } catch (Throwable ignore) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Failed to stop bridge %s from %s ", bridgeName, this.getClass().getSimpleName(), ignore); } } @@ -1785,7 +1892,11 @@ public final class JMSBridgeImpl implements JMSBridge { if (maxRetries > 0 || maxRetries == -1) { ActiveMQJMSBridgeLogger.LOGGER.bridgeRetry(failureRetryInterval, bridgeName); - pause(failureRetryInterval); + if (!pause(failureRetryInterval)) { + ActiveMQJMSBridgeLogger.LOGGER.tracef("Interrupted while pausing the bridge %s", bridgeName); + failed(); + return; + } // Now we try ok = setupJMSObjectsWithRetry(); @@ -1839,53 +1950,58 @@ public final class JMSBridgeImpl implements JMSBridge { ActiveMQJMSBridgeLogger.LOGGER.trace(this + " running"); } - synchronized (lock) { - while (started) { - long toWait = batchExpiryTime - System.currentTimeMillis(); + final CountDownLatch completed = batchTimeCheckerFinished; + try { + synchronized (lock) { + while (started) { + long toWait = batchExpiryTime - System.currentTimeMillis(); - if (toWait <= 0) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited enough"); - } + if (toWait <= 0) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waited enough"); + } - synchronized (lock) { - if (!failed && !messages.isEmpty()) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " got some messages so sending batch"); - } + synchronized (lock) { + if (!failed && !messages.isEmpty()) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " got some messages so sending batch"); + } - sendBatch(); + sendBatch(); - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " sent batch"); + } } } - } - batchExpiryTime = System.currentTimeMillis() + maxBatchTime; - } else { - try { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waiting for " + toWait); - } + batchExpiryTime = System.currentTimeMillis() + maxBatchTime; + } else { + try { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " waiting for " + toWait); + } - lock.wait(toWait); + lock.wait(toWait); - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke up"); - } - } catch (InterruptedException e) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); - } - if (stopping) { - return; + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " woke up"); + } + } catch (InterruptedException e) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace(this + " thread was interrupted"); + } + if (stopping) { + return; + } + throw new ActiveMQInterruptedException(e); } - throw new ActiveMQInterruptedException(e); - } + } } } + } finally { + completed.countDown(); } } } @@ -2037,8 +2153,8 @@ public final class JMSBridgeImpl implements JMSBridge { } /* - * make sure we reset the connected flags - * */ + * make sure we reset the connected flags + * */ if (result == FailoverEventType.FAILOVER_COMPLETED) { if (isSource) { connectedSource = true;