This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 720f60a  ARTEMIS-2297 Avoiding Split Brains during replication catch 
up when no quorum is established
     new e796c24  This closes #2611
720f60a is described below

commit 720f60ace27ef15dddffbbb3f6b93bca11a80e67
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Tue Apr 9 15:23:45 2019 -0400

    ARTEMIS-2297 Avoiding Split Brains during replication catch up when no 
quorum is established
---
 .../artemis/api/core/ActiveMQExceptionType.java    |   7 +-
 .../core/ActiveMQReplicationTimeooutException.java |  34 ++++
 .../protocol/core/impl/RemotingConnectionImpl.java |   8 +
 .../journal/AbstractJournalStorageManager.java     |   2 +-
 .../impl/journal/JournalStorageManager.java        |   2 +-
 .../core/replication/ReplicationEndpoint.java      |  51 ++++-
 .../core/replication/ReplicationManager.java       |  32 ++-
 .../artemis/core/server/ActiveMQMessageBundle.java |   3 +-
 .../core/server/impl/ActiveMQServerImpl.java       |  41 ++++
 .../server/impl/SharedNothingLiveActivation.java   |   2 +-
 .../cluster/failover/ReplicaTimeoutTest.java       | 220 +++++++++++++++++++++
 .../integration/replication/ReplicationTest.java   |   2 +-
 12 files changed, 389 insertions(+), 15 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 7cec2e4..0314901 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -255,8 +255,13 @@ public enum ActiveMQExceptionType {
       public ActiveMQException createException(String msg) {
          return new ActiveMQShutdownException(msg);
       }
+   },
+   REPLICATION_TIMEOUT_ERROR(220) {
+      @Override
+      public ActiveMQException createException(String msg) {
+         return new ActiveMQReplicationTimeooutException(msg);
+      }
    };
-
    private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
 
    static {
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQReplicationTimeooutException.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQReplicationTimeooutException.java
new file mode 100644
index 0000000..b0075c3
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQReplicationTimeooutException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * The creation of a session was rejected by the server (e.g. if the server is 
starting and has not
+ * finish to be initialized.
+ */
+public final class ActiveMQReplicationTimeooutException extends 
ActiveMQException {
+
+   private static final long serialVersionUID = -4486139158452585899L;
+
+   public ActiveMQReplicationTimeooutException() {
+      super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR);
+   }
+
+   public ActiveMQReplicationTimeooutException(String msg) {
+      super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR, msg);
+   }
+}
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index db2b500..00bd704 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -187,6 +187,14 @@ public class RemotingConnectionImpl extends 
AbstractRemotingConnection implement
       channels.put(channelID, channel);
    }
 
+   public List<Interceptor> getIncomingInterceptors() {
+      return incomingInterceptors;
+   }
+
+   public List<Interceptor> getOutgoingInterceptors() {
+      return outgoingInterceptors;
+   }
+
    @Override
    public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
       synchronized (failLock) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index cbe6512..a6e47ee 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -185,7 +185,7 @@ public abstract class AbstractJournalStorageManager extends 
CriticalComponentImp
 
    protected boolean journalLoaded = false;
 
-   private final IOCriticalErrorListener ioCriticalErrorListener;
+   protected final IOCriticalErrorListener ioCriticalErrorListener;
 
    protected final Configuration config;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index fac9221..a9cd892 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -669,7 +669,7 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
          storageManagerLock.writeLock().lock();
          try {
             if (replicator != null) {
-               replicator.sendSynchronizationDone(nodeID, 
initialReplicationSyncTimeout);
+               replicator.sendSynchronizationDone(nodeID, 
initialReplicationSyncTimeout, ioCriticalErrorListener);
                performCachedLargeMessageDeletes();
             }
          } finally {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
index 998bbcf..bdbee67 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java
@@ -23,13 +23,16 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -50,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel;
 import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
@@ -124,6 +128,9 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
 
    private Executor executor;
 
+   private List<Interceptor> outgoingInterceptors = null;
+
+
    // Constructors --------------------------------------------------
    public ReplicationEndpoint(final ActiveMQServerImpl server,
                               IOCriticalErrorListener criticalErrorListener,
@@ -150,6 +157,13 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
       journals[id] = journal;
    }
 
+   public void addOutgoingInterceptorForReplication(Interceptor interceptor) {
+      if (outgoingInterceptors == null) {
+         outgoingInterceptors = new CopyOnWriteArrayList<>();
+      }
+      outgoingInterceptors.add(interceptor);
+   }
+
    /**
     * This is for tests basically, do not use it as its API is not guaranteed 
for future usage.
     */
@@ -229,12 +243,16 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
             logger.trace("Returning " + response);
          }
 
-         channel.send(response);
+         sendResponse(response);
       } else {
          logger.trace("Response is null, ignoring response");
       }
    }
 
+   protected void sendResponse(PacketImpl response) {
+      channel.send(response);
+   }
+
    /**
     * @param packet
     */
@@ -348,6 +366,20 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
 
    public void setChannel(final Channel channel) {
       this.channel = channel;
+
+      if (this.channel != null && outgoingInterceptors != null) {
+         if (channel.getConnection() instanceof RemotingConnectionImpl)  {
+            try {
+               RemotingConnectionImpl impl = (RemotingConnectionImpl) 
channel.getConnection();
+               for (Interceptor interceptor : outgoingInterceptors) {
+                  impl.getOutgoingInterceptors().add(interceptor);
+               }
+            } catch (Throwable e) {
+               // This is code for embedded or testing, it should not affect 
server's semantics in case of error
+               logger.warn(e.getMessage(), e);
+            }
+         }
+      }
    }
 
    private synchronized void finishSynchronization(String liveID) throws 
Exception {
@@ -511,11 +543,6 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
             final JournalContent journalContent = 
SyncDataType.getJournalContentType(packet.getDataType());
             final Journal journal = journalsHolder.get(journalContent);
 
-            if (packet.getNodeID() != null) {
-               // At the start of replication, we still do not know which is 
the nodeID that the live uses.
-               // This is the point where the backup gets this information.
-               backupQuorum.liveIDSet(packet.getNodeID());
-            }
             Map<Long, JournalSyncFile> mapToFill = 
filesReservedForSync.get(journalContent);
 
             for (Entry<Long, JournalFile> entry : 
journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
@@ -523,6 +550,18 @@ public final class ReplicationEndpoint implements 
ChannelHandler, ActiveMQCompon
             }
             FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
             registerJournal(journalContent.typeByte, syncJournal);
+
+            // We send a response now, to avoid a situation where we handle 
votes during the deactivation of the live during a failback.
+            sendResponse(replicationResponseMessage);
+            replicationResponseMessage = null;
+
+            // This needs to be done after the response is sent, to avoid 
voting shutting it down for any reason.
+            if (packet.getNodeID() != null) {
+               // At the start of replication, we still do not know which is 
the nodeID that the live uses.
+               // This is the point where the backup gets this information.
+               backupQuorum.liveIDSet(packet.getNodeID());
+            }
+
             break;
          default:
             throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
index b307789..1d1217d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java
@@ -33,9 +33,11 @@ import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import 
org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
@@ -69,7 +71,10 @@ import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
+import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -107,6 +112,8 @@ public final class ReplicationManager implements 
ActiveMQComponent {
       }
    }
 
+   private final ActiveMQServer server;
+
    private final ResponseHandler responseHandler = new ResponseHandler();
 
    private final Channel replicatingChannel;
@@ -136,10 +143,12 @@ public final class ReplicationManager implements 
ActiveMQComponent {
    /**
     * @param remotingConnection
     */
-   public ReplicationManager(CoreRemotingConnection remotingConnection,
+   public ReplicationManager(ActiveMQServer server,
+                             CoreRemotingConnection remotingConnection,
                              final long timeout,
                              final long initialReplicationSyncTimeout,
                              final ExecutorFactory ioExecutorFactory) {
+      this.server = server;
       this.ioExecutorFactory = ioExecutorFactory;
       this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
       this.replicatingChannel = 
remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
@@ -631,7 +640,7 @@ public final class ReplicationManager implements 
ActiveMQComponent {
     *
     * @param nodeID
     */
-   public void sendSynchronizationDone(String nodeID, long 
initialReplicationSyncTimeout) {
+   public void sendSynchronizationDone(String nodeID, long 
initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener) 
throws ActiveMQReplicationTimeooutException {
       if (enabled) {
 
          if (logger.isTraceEnabled()) {
@@ -642,8 +651,25 @@ public final class ReplicationManager implements 
ActiveMQComponent {
          sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
          try {
             if 
(!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout))
 {
+               ActiveMQReplicationTimeooutException exception = 
ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
+
+               if (server != null) {
+                  try {
+                     ClusterManager clusterManager = 
server.getClusterManager();
+                     if (clusterManager != null) {
+                        QuorumManager manager = 
clusterManager.getQuorumManager();
+                        if (criticalErrorListener != null && manager != null 
&& manager.getMaxClusterSize() <= 2) {
+                           criticalErrorListener.onIOException(exception, 
exception.getMessage(), null);
+                        }
+                     }
+                  } catch (Throwable e) {
+                     // if NPE or anything else, continue as nothing changed
+                     logger.warn(e.getMessage(), e);
+                  }
+               }
+
                logger.trace("sendSynchronizationDone wasn't finished in time");
-               throw 
ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
+               throw exception;
             }
          } catch (InterruptedException e) {
             logger.debug(e);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 5cb8c60..ff92501 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -38,6 +38,7 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseExce
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
+import 
org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress;
@@ -370,7 +371,7 @@ public interface ActiveMQMessageBundle {
    IllegalArgumentException invalidMessageLoadBalancingType(String val);
 
    @Message(id = 229114, value = "Replication synchronization process timed 
out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT)
-   IllegalStateException replicationSynchronizationTimeout(long timeout);
+   ActiveMQReplicationTimeooutException replicationSynchronizationTimeout(long 
timeout);
 
    @Message(id = 229115, value = "Colocated Policy hasn''t different type live 
and backup", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQIllegalStateException liveBackupMismatch();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 7e8f3a3..c33fedf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -325,6 +325,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private CriticalAnalyzer analyzer;
 
+   // This is a callback to be called right before an activation is created
+   private Runnable afterActivationCreated;
+
    //todo think about moving this to the activation
    private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>();
 
@@ -459,6 +462,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    // life-cycle methods
    // ----------------------------------------------------------------
 
+   /**
+    * A Callback for tests
+    * @return
+    */
+   public Runnable getAfterActivationCreated() {
+      return afterActivationCreated;
+   }
+
+   /**
+    * A Callback for tests
+    * @param afterActivationCreated
+    * @return
+    */
+   public ActiveMQServerImpl setAfterActivationCreated(Runnable 
afterActivationCreated) {
+      this.afterActivationCreated = afterActivationCreated;
+      return this;
+   }
+
    /*
     * Can be overridden for tests
     */
@@ -560,6 +581,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          if (!haPolicy.isBackup()) {
             activation = haPolicy.createActivation(this, false, 
activationParams, shutdownOnCriticalIO);
 
+            if (afterActivationCreated != null) {
+               try {
+                  afterActivationCreated.run();
+               } catch (Throwable e) {
+                  logger.warn(e.getMessage(), e); // just debug, this is not 
supposed to happend, and if it does
+               }
+
+               afterActivationCreated = null;
+            }
+
             if (haPolicy.isWaitForActivation()) {
                activation.run();
             } else {
@@ -579,6 +610,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                activation = haPolicy.createActivation(this, wasLive, 
activationParams, shutdownOnCriticalIO);
             }
 
+            if (afterActivationCreated != null) {
+               try {
+                  afterActivationCreated.run();
+               } catch (Throwable e) {
+                  logger.warn(e.getMessage(), e); // just debug, this is not 
supposed to happend, and if it does
+                  // it will be embedeed code from tests
+               }
+               afterActivationCreated = null;
+            }
+
             if (logger.isTraceEnabled()) {
                logger.trace("starting backupActivation");
             }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
index c03fd19..a4d8db2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java
@@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends 
LiveActivation {
          ReplicationFailureListener listener = new 
ReplicationFailureListener();
          rc.addCloseListener(listener);
          rc.addFailureListener(listener);
-         replicationManager = new ReplicationManager(rc, 
clusterConnection.getCallTimeout(), 
replicatedPolicy.getInitialReplicationSyncTimeout(), 
activeMQServer.getIOExecutorFactory());
+         replicationManager = new ReplicationManager(activeMQServer, rc, 
clusterConnection.getCallTimeout(), 
replicatedPolicy.getInitialReplicationSyncTimeout(), 
activeMQServer.getIOExecutorFactory());
          replicationManager.start();
          Thread t = new Thread(new Runnable() {
             @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicaTimeoutTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicaTimeoutTest.java
new file mode 100644
index 0000000..c1ba9d3
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicaTimeoutTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
+import 
org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
+import 
org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
+import 
org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
+import org.apache.activemq.artemis.junit.Wait;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import 
org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
+import 
org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicaTimeoutTest extends ActiveMQTestBase {
+
+   protected ServerLocator locator;
+
+   protected static final SimpleString ADDRESS = new 
SimpleString("FailoverTestAddress");
+
+   @Before
+   public void setup() {
+      locator = 
addServerLocator(ActiveMQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true),
 getConnectorTransportConfiguration(false))).setRetryInterval(50);
+   }
+
+   protected TransportConfiguration getAcceptorTransportConfiguration(final 
boolean live) {
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   protected TransportConfiguration getConnectorTransportConfiguration(final 
boolean live) {
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+   protected NodeManager createReplicatedBackupNodeManager(Configuration 
backupConfig) {
+      return new InVMNodeManager(true, backupConfig.getJournalLocation());
+   }
+
+   protected TestableServer createTestableServer(Configuration config, 
NodeManager nodeManager) throws Exception {
+      boolean isBackup = config.getHAPolicyConfiguration() instanceof 
ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof 
SharedStoreSlavePolicyConfiguration;
+      return new SameProcessActiveMQServer(createInVMFailoverServer(true, 
config, nodeManager, isBackup ? 2 : 1));
+   }
+
+   protected ClientSessionFactoryInternal 
createSessionFactoryAndWaitForTopology(ServerLocator locator,
+                                                                               
  int topologyMembers) throws Exception {
+      CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
+
+      locator.addClusterTopologyListener(new 
FailoverTestBase.LatchClusterTopologyListener(countDownLatch));
+
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) 
locator.createSessionFactory();
+      addSessionFactory(sf);
+
+      Assert.assertTrue("topology members expected " + topologyMembers, 
countDownLatch.await(5, TimeUnit.SECONDS));
+      return sf;
+   }
+
+   protected ClientSessionFactoryInternal createSessionFactory() throws 
Exception {
+      
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(300).setRetryInterval(100);
+
+      return createSessionFactoryAndWaitForTopology(locator, 2);
+   }
+
+   protected ClientSession createSession(ClientSessionFactory sf1,
+                                         boolean autoCommitSends,
+                                         boolean autoCommitAcks) throws 
Exception {
+      return addClientSession(sf1.createSession(autoCommitSends, 
autoCommitAcks));
+   }
+
+   protected void crash(TestableServer liveServer,
+                        TestableServer backupServer,
+                        ClientSession... sessions) throws Exception {
+      if (sessions.length > 0) {
+         for (ClientSession session : sessions) {
+            waitForRemoteBackup(session.getSessionFactory(), 5, true, 
backupServer.getServer());
+         }
+      } else {
+         waitForRemoteBackup(null, 5, true, backupServer.getServer());
+      }
+      liveServer.crash(true, true, sessions);
+   }
+
+   @Test//(timeout = 120000)
+   public void testFailbackTimeout() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      try {
+         TestableServer backupServer = null;
+         TestableServer liveServer = null;
+         ClientSessionFactory sf = null;
+         try {
+            final TransportConfiguration liveConnector = 
getConnectorTransportConfiguration(true);
+            final TransportConfiguration backupConnector = 
getConnectorTransportConfiguration(false);
+            final TransportConfiguration backupAcceptor = 
getAcceptorTransportConfiguration(false);
+
+            Configuration backupConfig = createDefaultInVMConfig();
+            Configuration liveConfig = createDefaultInVMConfig();
+
+            ReplicatedBackupUtils.configureReplicationPair(backupConfig, 
backupConnector, backupAcceptor, liveConfig, liveConnector, null);
+            ((ReplicatedPolicyConfiguration) 
liveConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000);
+            ((ReplicaPolicyConfiguration) 
backupConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000);
+
+            backupConfig.setBindingsDirectory(getBindingsDir(0, 
true)).setJournalDirectory(getJournalDir(0, true)).
+               setPagingDirectory(getPageDir(0, 
true)).setLargeMessagesDirectory(getLargeMessagesDir(0, 
true)).setSecurityEnabled(false);
+            liveConfig.setBindingsDirectory(getBindingsDir(0, 
false)).setJournalDirectory(getJournalDir(0, false)).
+               setPagingDirectory(getPageDir(0, 
false)).setLargeMessagesDirectory(getLargeMessagesDir(0, 
false)).setSecurityEnabled(false);
+
+            ((ReplicatedPolicyConfiguration) 
liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true);
+            ((ReplicaPolicyConfiguration) 
backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true);
+            ((ReplicaPolicyConfiguration) 
backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
+
+            NodeManager nodeManager = 
createReplicatedBackupNodeManager(backupConfig);
+
+            backupServer = createTestableServer(backupConfig, nodeManager);
+
+            
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true));
+
+            liveServer = createTestableServer(liveConfig, nodeManager);
+
+            AtomicBoolean ignoreIntercept = new AtomicBoolean(false);
+
+            final TestableServer theBackup = backupServer;
+
+            liveServer.start();
+            backupServer.start();
+
+            Wait.assertTrue(backupServer.getServer()::isReplicaSync);
+
+            sf = createSessionFactory();
+
+            ClientSession session = createSession(sf, true, true);
+
+            session.createQueue(ADDRESS, ADDRESS, null, true);
+
+            crash(liveServer, backupServer, session);
+
+            Wait.assertTrue(backupServer.getServer()::isActive);
+
+            ignoreIntercept.set(true);
+
+            ((ActiveMQServerImpl) 
backupServer.getServer()).setAfterActivationCreated(new Runnable() {
+               @Override
+               public void run() {
+                  //theBackup.getServer().getActivation()
+
+                  SharedNothingBackupActivation activation = 
(SharedNothingBackupActivation) theBackup.getServer().getActivation();
+                  
activation.getReplicationEndpoint().addOutgoingInterceptorForReplication(new 
Interceptor() {
+                     @Override
+                     public boolean intercept(Packet packet, 
RemotingConnection connection) throws ActiveMQException {
+                        if (ignoreIntercept.get() && packet.getType() == 
PacketImpl.REPLICATION_RESPONSE_V2) {
+                           return false;
+                        }
+                        return true;
+                     }
+                  });
+               }
+            });
+
+            liveServer.start();
+
+            Assert.assertTrue(Wait.waitFor(() -> 
AssertionLoggerHandler.findText("AMQ229114")));
+
+            Wait.assertFalse(liveServer.getServer()::isStarted);
+
+         } finally {
+            if (sf != null) {
+               sf.close();
+            }
+            try {
+               liveServer.getServer().stop();
+            } catch (Throwable ignored) {
+            }
+            try {
+               backupServer.getServer().stop();
+            } catch (Throwable ignored) {
+            }
+         }
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+      }
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
index c27ea68..1c89425 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java
@@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase 
{
       setupServer(false);
       try {
          ClientSessionFactory sf = createSessionFactory(locator);
-         manager = new ReplicationManager((CoreRemotingConnection) 
sf.getConnection(), sf.getServerLocator().getCallTimeout(), 
sf.getServerLocator().getCallTimeout(), factory);
+         manager = new ReplicationManager(null, (CoreRemotingConnection) 
sf.getConnection(), sf.getServerLocator().getCallTimeout(), 
sf.getServerLocator().getCallTimeout(), factory);
          addActiveMQComponent(manager);
          manager.start();
          Assert.fail("Exception was expected");

Reply via email to