ARTEMIS-1393 CriticalAnalyzer timeout uses System::currentTimeMillis The timeout logic is changed to use System::nanoTime, less sensible to OS clock changes. The volatile set on CriticalMeasure are changed with cheaper lazySet.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9cf222be Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9cf222be Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9cf222be Branch: refs/heads/master Commit: 9cf222be116192a8f87eb9e55fc5d0246dc5cf3f Parents: 99b2e4c Author: Francesco Nigro <nigro....@gmail.com> Authored: Wed Sep 6 18:08:42 2017 +0200 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Sep 7 10:11:35 2017 -0400 ---------------------------------------------------------------------- .../utils/critical/CriticalAnalyzer.java | 10 +++--- .../utils/critical/CriticalAnalyzerImpl.java | 32 ++++++++++---------- .../utils/critical/CriticalComponent.java | 2 +- .../artemis/utils/critical/CriticalMeasure.java | 30 ++++++++++++------ .../utils/critical/EmptyCriticalAnalyzer.java | 14 +++++---- .../utils/critical/CriticalAnalyzerTest.java | 29 +++++++++++++++--- .../core/server/impl/ActiveMQServerImpl.java | 24 +++++++-------- 7 files changed, 89 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java index 6b5a436..844f9f0 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.utils.critical; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.core.server.ActiveMQComponent; public interface CriticalAnalyzer extends ActiveMQComponent { @@ -29,13 +31,13 @@ public interface CriticalAnalyzer extends ActiveMQComponent { void remove(CriticalComponent component); - CriticalAnalyzer setCheckTime(long timeout); + CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit); - long getCheckTime(); + long getCheckTimeNanoSeconds(); - CriticalAnalyzer setTimeout(long timeout); + CriticalAnalyzer setTimeout(long timeout, TimeUnit unit); - long getTimeout(); + long getTimeout(TimeUnit unit); CriticalAnalyzer addAction(CriticalAction action); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java index c583f2a..ef31fe8 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java @@ -29,9 +29,9 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { private final Logger logger = Logger.getLogger(CriticalAnalyzer.class); - private volatile long timeout; + private volatile long timeoutNanoSeconds; - private volatile long checkTime; + private volatile long checkTimeNanoSeconds; @Override public void clear() { @@ -63,31 +63,31 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { } @Override - public CriticalAnalyzer setCheckTime(long timeout) { - this.checkTime = timeout; + public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) { + this.checkTimeNanoSeconds = timeout; return this; } @Override - public long getCheckTime() { - if (checkTime == 0) { - checkTime = getTimeout() / 2; + public long getCheckTimeNanoSeconds() { + if (checkTimeNanoSeconds == 0) { + checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2; } - return checkTime; + return checkTimeNanoSeconds; } @Override - public CriticalAnalyzer setTimeout(long timeout) { - this.timeout = timeout; + public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) { + this.timeoutNanoSeconds = unit.toNanos(timeout); return this; } @Override - public long getTimeout() { - if (timeout == 0) { - timeout = TimeUnit.MINUTES.toMillis(2); + public long getTimeout(TimeUnit unit) { + if (timeoutNanoSeconds == 0) { + timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2); } - return timeout; + return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS); } @Override @@ -103,7 +103,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { try { for (CriticalComponent component : components) { - if (component.isExpired(timeout)) { + if (component.isExpired(timeoutNanoSeconds)) { fireAction(component); // no need to keep running if there's already a component failed return; @@ -142,7 +142,7 @@ public class CriticalAnalyzerImpl implements CriticalAnalyzer { public void run() { try { while (true) { - if (running.tryAcquire(getCheckTime(), TimeUnit.MILLISECONDS)) { + if (running.tryAcquire(getCheckTimeNanoSeconds(), TimeUnit.NANOSECONDS)) { running.release(); // this means that the server has been stopped as we could acquire the semaphore... returning now break; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java index a2459dd..367e9c5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalComponent.java @@ -20,7 +20,7 @@ package org.apache.activemq.artemis.utils.critical; /** * A Critical component enters and leaves a critical state. * You update a long every time you enter a critical path - * you update a different long with a System.currentMillis every time you leave that path. + * you update a different long with a System.nanoTime every time you leave that path. * * If the enterCritical > leaveCritical at any point, then you need to measure the timeout. * if the system stops responding, then you have something irresponsive at the system. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java index b853dc5..5b78a4a 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalMeasure.java @@ -17,36 +17,48 @@ package org.apache.activemq.artemis.utils.critical; -import org.jboss.logging.Logger; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; public class CriticalMeasure { - private static final Logger logger = Logger.getLogger(CriticalMeasure.class); + //uses updaters to avoid creates many AtomicLong instances + private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_ENTER_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeEnter"); + private static final AtomicLongFieldUpdater<CriticalMeasure> TIME_LEFT_UPDATER = AtomicLongFieldUpdater.newUpdater(CriticalMeasure.class, "timeLeft"); private volatile long timeEnter; private volatile long timeLeft; + public CriticalMeasure() { + //prefer this approach instead of using some fixed value because System::nanoTime could change sign + //with long running processes + enterCritical(); + leaveCritical(); + } + public void enterCritical() { - timeEnter = System.currentTimeMillis(); + //prefer lazySet in order to avoid heavy-weight full barriers on x86 + TIME_ENTER_UPDATER.lazySet(this, System.nanoTime()); } public void leaveCritical() { - timeLeft = System.currentTimeMillis(); + TIME_LEFT_UPDATER.lazySet(this, System.nanoTime()); } public boolean isExpired(long timeout) { - if (timeEnter > timeLeft) { - return System.currentTimeMillis() - timeEnter > timeout; + final long timeLeft = TIME_LEFT_UPDATER.get(this); + final long timeEnter = TIME_ENTER_UPDATER.get(this); + //due to how System::nanoTime works is better to use differences to prevent numerical overflow while comparing + if (timeLeft - timeEnter < 0) { + return System.nanoTime() - timeEnter > timeout; } - return false; } public long enterTime() { - return timeEnter; + return TIME_ENTER_UPDATER.get(this); } public long leaveTime() { - return timeLeft; + return TIME_LEFT_UPDATER.get(this); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java index 4cf23a9..a5064ce 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.utils.critical; +import java.util.concurrent.TimeUnit; + public class EmptyCriticalAnalyzer implements CriticalAnalyzer { private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer(); @@ -59,22 +61,22 @@ public class EmptyCriticalAnalyzer implements CriticalAnalyzer { } @Override - public CriticalAnalyzer setCheckTime(long timeout) { - return this; + public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) { + return null; } @Override - public long getCheckTime() { + public long getCheckTimeNanoSeconds() { return 0; } @Override - public CriticalAnalyzer setTimeout(long timeout) { - return this; + public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) { + return null; } @Override - public long getTimeout() { + public long getTimeout(TimeUnit unit) { return 0; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java index 638eb61..d8ebaf3 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerTest.java @@ -42,7 +42,7 @@ public class CriticalAnalyzerTest { @Test public void testAction() throws Exception { - analyzer = new CriticalAnalyzerImpl().setTimeout(100).setCheckTime(50); + analyzer = new CriticalAnalyzerImpl().setTimeout(100, TimeUnit.MILLISECONDS).setCheckTime(50, TimeUnit.MILLISECONDS); analyzer.add(new CriticalComponent() { @Override public boolean isExpired(long timeout) { @@ -66,7 +66,7 @@ public class CriticalAnalyzerTest { @Test public void testActionOnImpl() throws Exception { - analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5); + analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS); CriticalComponent component = new CriticalComponentImpl(analyzer, 2); analyzer.add(component); @@ -89,8 +89,29 @@ public class CriticalAnalyzerTest { } @Test + public void testEnterNoLeaveNoExpire() throws Exception { + analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS); + CriticalComponent component = new CriticalComponentImpl(analyzer, 2); + component.enterCritical(0); + Assert.assertFalse(component.isExpired(TimeUnit.MINUTES.toNanos(1))); + analyzer.stop(); + + } + + @Test + public void testEnterNoLeaveExpire() throws Exception { + analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS); + CriticalComponent component = new CriticalComponentImpl(analyzer, 2); + component.enterCritical(0); + Thread.sleep(50); + Assert.assertTrue(component.isExpired(0)); + analyzer.stop(); + + } + + @Test public void testNegative() throws Exception { - analyzer = new CriticalAnalyzerImpl().setTimeout(10).setCheckTime(5); + analyzer = new CriticalAnalyzerImpl().setTimeout(10, TimeUnit.MILLISECONDS).setCheckTime(5, TimeUnit.MILLISECONDS); CriticalComponent component = new CriticalComponentImpl(analyzer, 1); analyzer.add(component); @@ -111,4 +132,4 @@ public class CriticalAnalyzerTest { analyzer.stop(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9cf222be/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 15aca09..68e5559 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -506,7 +506,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { /** Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/ this.analyzer.clear(); - this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod()).setTimeout(configuration.getCriticalAnalyzerTimeout()); + this.getCriticalAnalyzer().setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS); if (configuration.isCriticalAnalyzer()) { this.getCriticalAnalyzer().start(); @@ -1437,7 +1437,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { checkSessionLimit(validatedUser); callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateSession(name, username, minLargeMessageSize, connection, - autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null); + autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, autoCreateQueues, context, prefixes) : null); final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues, prefixes); @@ -1838,7 +1838,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeDestroyQueue(queueName, session, checkConsumerCount, - removeConsumers, autoDeleteAddress) : null); + removeConsumers, autoDeleteAddress) : null); addressSettingsRepository.clearCache(); @@ -1882,7 +1882,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { callPostQueueDeletionCallbacks(address, queueName); callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount, - removeConsumers, autoDeleteAddress) : null); + removeConsumers, autoDeleteAddress) : null); } @Override @@ -2456,13 +2456,13 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void undeployAddressesAndQueueNotInConfiguration(Configuration configuration) throws Exception { Set<String> addressesInConfig = configuration.getAddressConfigurations().stream() - .map(CoreAddressConfiguration::getName) - .collect(Collectors.toSet()); + .map(CoreAddressConfiguration::getName) + .collect(Collectors.toSet()); Set<String> queuesInConfig = configuration.getAddressConfigurations().stream() - .map(CoreAddressConfiguration::getQueueConfigurations) - .flatMap(List::stream).map(CoreQueueConfiguration::getName) - .collect(Collectors.toSet()); + .map(CoreAddressConfiguration::getQueueConfigurations) + .flatMap(List::stream).map(CoreQueueConfiguration::getName) + .collect(Collectors.toSet()); for (SimpleString addressName : listAddressNames()) { AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString()); @@ -2521,8 +2521,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { Queue queue = updateQueue(config.getName(), config.getRoutingType(), config.getMaxConsumers(), config.getPurgeOnNoConsumers()); if (queue == null) { queue = createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), - queueName, SimpleString.toSimpleString(config.getFilterString()), null, - config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true); + queueName, SimpleString.toSimpleString(config.getFilterString()), null, + config.isDurable(), false, true, false, false, config.getMaxConsumers(), config.getPurgeOnNoConsumers(), true); } return queue; } @@ -2990,4 +2990,4 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } -} +} \ No newline at end of file