Repository: activemq-artemis
Updated Branches:
  refs/heads/master f91432eec -> 646e55514


NO-JIRA Test fixes

- LargeServerMessageImpl.finalize is eventually causing deadlocks
- CoreMessage needs to check properties before decoding
- PagingTest tweaks
- ServerLocatorImpl can deadlock eventually, avoiding a lock and using actors
- ActiveMQServerImpl.finalize is also evil and can cause deadlocks on the 
testsuite
- MqttClusterRemoteSubscribeTest needs to setup the Address now on the setup


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b5bf5afd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b5bf5afd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b5bf5afd

Branch: refs/heads/master
Commit: b5bf5afde7b767683e71956b49cc6ec8f94c10b8
Parents: f91432e
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Wed Feb 14 10:15:01 2018 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Feb 14 10:56:44 2018 -0500

----------------------------------------------------------------------
 .../core/client/impl/ServerLocatorImpl.java     | 34 +++++++++++++++++---
 .../artemis/core/message/impl/CoreMessage.java  |  3 +-
 .../impl/journal/LargeServerMessageImpl.java    |  6 ----
 .../core/server/impl/ActiveMQServerImpl.java    | 11 -------
 .../MqttClusterRemoteSubscribeTest.java         |  1 +
 .../tests/integration/paging/PagingTest.java    | 14 ++------
 6 files changed, 35 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 02c17c6..978cc39 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -67,6 +67,7 @@ import 
org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
 import org.apache.activemq.artemis.utils.ClassloadingUtil;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.artemis.utils.actors.Actor;
 import 
org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
 import org.jboss.logging.Logger;
 
@@ -199,6 +200,8 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
 
    private Executor startExecutor;
 
+   private Actor<Long> updateArrayActor;
+
    private AfterConnectInternalListener afterConnectListener;
 
    private String groupID;
@@ -251,6 +254,8 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
 
          scheduledThreadPool = 
Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
       }
+
+      this.updateArrayActor = new Actor<>(threadPool, 
this::internalUpdateArray);
    }
 
    @Override
@@ -534,6 +539,8 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
    private TransportConfiguration selectConnector() {
       Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
 
+      flushTopology();
+
       synchronized (topologyArrayGuard) {
          usedTopology = topologyArray;
       }
@@ -743,6 +750,8 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
 
       initialise();
 
+      flushTopology();
+
       if (this.getNumInitialConnectors() == 0 && discoveryGroup != null) {
          // Wait for an initial broadcast to give us at least one node in the 
cluster
          long timeout = clusterConnection ? 0 : 
discoveryGroupConfiguration.getDiscoveryInitialWaitTimeout();
@@ -812,6 +821,12 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       return factory;
    }
 
+   public void flushTopology() {
+      if (updateArrayActor != null) {
+         updateArrayActor.flush(10, TimeUnit.SECONDS);
+      }
+   }
+
    @Override
    public boolean isHA() {
       return ha;
@@ -1426,14 +1441,14 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       topology.removeMember(eventTime, nodeID);
 
       if (clusterConnection) {
-         updateArraysAndPairs();
+         updateArraysAndPairs(eventTime);
       } else {
          if (topology.isEmpty()) {
             // Resetting the topology to its original condition as it was 
brand new
             receivedTopology = false;
             topologyArray = null;
          } else {
-            updateArraysAndPairs();
+            updateArraysAndPairs(eventTime);
 
             if (topology.nodes() == 1 && topology.getMember(this.nodeID) != 
null) {
                // Resetting the topology to its original condition as it was 
brand new
@@ -1472,7 +1487,7 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
          }
       }
 
-      updateArraysAndPairs();
+      updateArraysAndPairs(uniqueEventID);
 
       if (last) {
          receivedTopology = true;
@@ -1496,7 +1511,16 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
    }
 
    @SuppressWarnings("unchecked")
-   private void updateArraysAndPairs() {
+   private void updateArraysAndPairs(long time) {
+      if (updateArrayActor == null) {
+         // if for some reason we don't have an actor, just go straight
+         internalUpdateArray(time);
+      } else {
+         updateArrayActor.act(time);
+      }
+   }
+
+   private void internalUpdateArray(long time) {
       synchronized (topologyArrayGuard) {
          Collection<TopologyMemberImpl> membersCopy = topology.getMembers();
 
@@ -1506,7 +1530,7 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
          for (TopologyMemberImpl pair : membersCopy) {
             Pair<TransportConfiguration, TransportConfiguration> 
transportConfigs = pair.getConnector();
             topologyArrayLocal[count++] = new 
Pair<>(protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getA()),
-                    
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
+                                                     
protocolManagerFactory.adaptTransportConfiguration(transportConfigs.getB()));
          }
 
          this.topologyArray = topologyArrayLocal;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 9119a0d..2c570b9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -369,6 +369,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
 
    @Override
    public Message copy() {
+      checkProperties();
       checkEncode();
       return new CoreMessage(this);
    }
@@ -936,8 +937,8 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
    @Override
    public CoreMessage putObjectProperty(final SimpleString key,
                                         final Object value) throws 
ActiveMQPropertyConversionException {
-      messageChanged();
       checkProperties();
+      messageChanged();
       TypedProperties.setObjectProperty(key, value, properties);
       return this;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 287b261..9a2e285 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -362,12 +362,6 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements LargeSe
       }
    }
 
-   @Override
-   protected void finalize() throws Throwable {
-      releaseResources();
-      super.finalize();
-   }
-
    // Private -------------------------------------------------------
 
    public synchronized void validateFile() throws ActiveMQException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index dad9300..2f6cb7f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -680,17 +680,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
-   protected final void finalize() throws Throwable {
-      if (state != SERVER_STATE.STOPPED) {
-         ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped();
-
-         stop();
-      }
-
-      super.finalize();
-   }
-
-   @Override
    public void setState(SERVER_STATE state) {
       this.state = state;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
index 630cdf5..8caba17 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
@@ -435,6 +435,7 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
       coreAddressConfiguration.setName(TOPIC);
       CoreQueueConfiguration coreQueueConfiguration = new 
CoreQueueConfiguration();
       coreQueueConfiguration.setName(TOPIC);
+      coreQueueConfiguration.setAddress(TOPIC);
       coreQueueConfiguration.setRoutingType(RoutingType.ANYCAST);
       coreAddressConfiguration.addQueueConfiguration(coreQueueConfiguration);
       return coreAddressConfiguration;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5bf5afd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 3de9203..bc09fa1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -835,7 +835,7 @@ public class PagingTest extends ActiveMQTestBase {
          ClientMessage message = session.createMessage(true);
 
          if (i < 1000) {
-            message.setExpiration(System.currentTimeMillis() + 1000);
+            message.setExpiration(System.currentTimeMillis() + 100);
          }
 
          message.putIntProperty("tst-count", i);
@@ -852,12 +852,7 @@ public class PagingTest extends ActiveMQTestBase {
       session.commit();
       producer.close();
 
-      for (long timeout = System.currentTimeMillis() + 60000; timeout > 
System.currentTimeMillis() && getMessageCount(qEXP) < 1000; ) {
-         System.out.println("count = " + getMessageCount(qEXP));
-         Thread.sleep(100);
-      }
-
-      assertEquals(1000, getMessageCount(qEXP));
+      Wait.assertEquals(1000, qEXP::getMessageCount);
 
       session.start();
 
@@ -874,10 +869,7 @@ public class PagingTest extends ActiveMQTestBase {
 
       assertNull(consumer.receiveImmediate());
 
-      for (long timeout = System.currentTimeMillis() + 5000; timeout > 
System.currentTimeMillis() && getMessageCount(queue1) != 0; ) {
-         Thread.sleep(100);
-      }
-      assertEquals(0, getMessageCount(queue1));
+      Wait.assertEquals(0, queue1::getMessageCount);
 
       consumer.close();
 

Reply via email to