This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 720f60a ARTEMIS-2297 Avoiding Split Brains during replication catch up when no quorum is established new e796c24 This closes #2611 720f60a is described below commit 720f60ace27ef15dddffbbb3f6b93bca11a80e67 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Tue Apr 9 15:23:45 2019 -0400 ARTEMIS-2297 Avoiding Split Brains during replication catch up when no quorum is established --- .../artemis/api/core/ActiveMQExceptionType.java | 7 +- .../core/ActiveMQReplicationTimeooutException.java | 34 ++++ .../protocol/core/impl/RemotingConnectionImpl.java | 8 + .../journal/AbstractJournalStorageManager.java | 2 +- .../impl/journal/JournalStorageManager.java | 2 +- .../core/replication/ReplicationEndpoint.java | 51 ++++- .../core/replication/ReplicationManager.java | 32 ++- .../artemis/core/server/ActiveMQMessageBundle.java | 3 +- .../core/server/impl/ActiveMQServerImpl.java | 41 ++++ .../server/impl/SharedNothingLiveActivation.java | 2 +- .../cluster/failover/ReplicaTimeoutTest.java | 220 +++++++++++++++++++++ .../integration/replication/ReplicationTest.java | 2 +- 12 files changed, 389 insertions(+), 15 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 7cec2e4..0314901 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -255,8 +255,13 @@ public enum ActiveMQExceptionType { public ActiveMQException createException(String msg) { return new ActiveMQShutdownException(msg); } + }, + REPLICATION_TIMEOUT_ERROR(220) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQReplicationTimeooutException(msg); + } }; - private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP; static { diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQReplicationTimeooutException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQReplicationTimeooutException.java new file mode 100644 index 0000000..b0075c3 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQReplicationTimeooutException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * The creation of a session was rejected by the server (e.g. if the server is starting and has not + * finish to be initialized. + */ +public final class ActiveMQReplicationTimeooutException extends ActiveMQException { + + private static final long serialVersionUID = -4486139158452585899L; + + public ActiveMQReplicationTimeooutException() { + super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR); + } + + public ActiveMQReplicationTimeooutException(String msg) { + super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR, msg); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index db2b500..00bd704 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -187,6 +187,14 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement channels.put(channelID, channel); } + public List<Interceptor> getIncomingInterceptors() { + return incomingInterceptors; + } + + public List<Interceptor> getOutgoingInterceptors() { + return outgoingInterceptors; + } + @Override public void fail(final ActiveMQException me, String scaleDownTargetNodeID) { synchronized (failLock) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index cbe6512..a6e47ee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -185,7 +185,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp protected boolean journalLoaded = false; - private final IOCriticalErrorListener ioCriticalErrorListener; + protected final IOCriticalErrorListener ioCriticalErrorListener; protected final Configuration config; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index fac9221..a9cd892 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -669,7 +669,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { storageManagerLock.writeLock().lock(); try { if (replicator != null) { - replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout); + replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout, ioCriticalErrorListener); performCachedLargeMessageDeletes(); } } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index 998bbcf..bdbee67 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -23,13 +23,16 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; @@ -50,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage; @@ -124,6 +128,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon private Executor executor; + private List<Interceptor> outgoingInterceptors = null; + + // Constructors -------------------------------------------------- public ReplicationEndpoint(final ActiveMQServerImpl server, IOCriticalErrorListener criticalErrorListener, @@ -150,6 +157,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon journals[id] = journal; } + public void addOutgoingInterceptorForReplication(Interceptor interceptor) { + if (outgoingInterceptors == null) { + outgoingInterceptors = new CopyOnWriteArrayList<>(); + } + outgoingInterceptors.add(interceptor); + } + /** * This is for tests basically, do not use it as its API is not guaranteed for future usage. */ @@ -229,12 +243,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon logger.trace("Returning " + response); } - channel.send(response); + sendResponse(response); } else { logger.trace("Response is null, ignoring response"); } } + protected void sendResponse(PacketImpl response) { + channel.send(response); + } + /** * @param packet */ @@ -348,6 +366,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon public void setChannel(final Channel channel) { this.channel = channel; + + if (this.channel != null && outgoingInterceptors != null) { + if (channel.getConnection() instanceof RemotingConnectionImpl) { + try { + RemotingConnectionImpl impl = (RemotingConnectionImpl) channel.getConnection(); + for (Interceptor interceptor : outgoingInterceptors) { + impl.getOutgoingInterceptors().add(interceptor); + } + } catch (Throwable e) { + // This is code for embedded or testing, it should not affect server's semantics in case of error + logger.warn(e.getMessage(), e); + } + } + } } private synchronized void finishSynchronization(String liveID) throws Exception { @@ -511,11 +543,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType()); final Journal journal = journalsHolder.get(journalContent); - if (packet.getNodeID() != null) { - // At the start of replication, we still do not know which is the nodeID that the live uses. - // This is the point where the backup gets this information. - backupQuorum.liveIDSet(packet.getNodeID()); - } Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent); for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) { @@ -523,6 +550,18 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon } FileWrapperJournal syncJournal = new FileWrapperJournal(journal); registerJournal(journalContent.typeByte, syncJournal); + + // We send a response now, to avoid a situation where we handle votes during the deactivation of the live during a failback. + sendResponse(replicationResponseMessage); + replicationResponseMessage = null; + + // This needs to be done after the response is sent, to avoid voting shutting it down for any reason. + if (packet.getNodeID() != null) { + // At the start of replication, we still do not know which is the nodeID that the live uses. + // This is the point where the backup gets this information. + backupQuorum.liveIDSet(packet.getNodeID()); + } + break; default: throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index b307789..1d1217d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -33,9 +33,11 @@ import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.core.journal.impl.JournalFile; @@ -69,7 +71,10 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.cluster.ClusterManager; +import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -107,6 +112,8 @@ public final class ReplicationManager implements ActiveMQComponent { } } + private final ActiveMQServer server; + private final ResponseHandler responseHandler = new ResponseHandler(); private final Channel replicatingChannel; @@ -136,10 +143,12 @@ public final class ReplicationManager implements ActiveMQComponent { /** * @param remotingConnection */ - public ReplicationManager(CoreRemotingConnection remotingConnection, + public ReplicationManager(ActiveMQServer server, + CoreRemotingConnection remotingConnection, final long timeout, final long initialReplicationSyncTimeout, final ExecutorFactory ioExecutorFactory) { + this.server = server; this.ioExecutorFactory = ioExecutorFactory; this.initialReplicationSyncTimeout = initialReplicationSyncTimeout; this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1); @@ -631,7 +640,7 @@ public final class ReplicationManager implements ActiveMQComponent { * * @param nodeID */ - public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) { + public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener) throws ActiveMQReplicationTimeooutException { if (enabled) { if (logger.isTraceEnabled()) { @@ -642,8 +651,25 @@ public final class ReplicationManager implements ActiveMQComponent { sendReplicatePacket(new ReplicationStartSyncMessage(nodeID)); try { if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) { + ActiveMQReplicationTimeooutException exception = ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); + + if (server != null) { + try { + ClusterManager clusterManager = server.getClusterManager(); + if (clusterManager != null) { + QuorumManager manager = clusterManager.getQuorumManager(); + if (criticalErrorListener != null && manager != null && manager.getMaxClusterSize() <= 2) { + criticalErrorListener.onIOException(exception, exception.getMessage(), null); + } + } + } catch (Throwable e) { + // if NPE or anything else, continue as nothing changed + logger.warn(e.getMessage(), e); + } + } + logger.trace("sendSynchronizationDone wasn't finished in time"); - throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout); + throw exception; } } catch (InterruptedException e) { logger.debug(e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 5cb8c60..ff92501 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseExce import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; +import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException; import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress; @@ -370,7 +371,7 @@ public interface ActiveMQMessageBundle { IllegalArgumentException invalidMessageLoadBalancingType(String val); @Message(id = 229114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT) - IllegalStateException replicationSynchronizationTimeout(long timeout); + ActiveMQReplicationTimeooutException replicationSynchronizationTimeout(long timeout); @Message(id = 229115, value = "Colocated Policy hasn''t different type live and backup", format = Message.Format.MESSAGE_FORMAT) ActiveMQIllegalStateException liveBackupMismatch(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 7e8f3a3..c33fedf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -325,6 +325,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { private CriticalAnalyzer analyzer; + // This is a callback to be called right before an activation is created + private Runnable afterActivationCreated; + //todo think about moving this to the activation private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>(); @@ -459,6 +462,24 @@ public class ActiveMQServerImpl implements ActiveMQServer { // life-cycle methods // ---------------------------------------------------------------- + /** + * A Callback for tests + * @return + */ + public Runnable getAfterActivationCreated() { + return afterActivationCreated; + } + + /** + * A Callback for tests + * @param afterActivationCreated + * @return + */ + public ActiveMQServerImpl setAfterActivationCreated(Runnable afterActivationCreated) { + this.afterActivationCreated = afterActivationCreated; + return this; + } + /* * Can be overridden for tests */ @@ -560,6 +581,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (!haPolicy.isBackup()) { activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); + if (afterActivationCreated != null) { + try { + afterActivationCreated.run(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); // just debug, this is not supposed to happend, and if it does + } + + afterActivationCreated = null; + } + if (haPolicy.isWaitForActivation()) { activation.run(); } else { @@ -579,6 +610,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO); } + if (afterActivationCreated != null) { + try { + afterActivationCreated.run(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); // just debug, this is not supposed to happend, and if it does + // it will be embedeed code from tests + } + afterActivationCreated = null; + } + if (logger.isTraceEnabled()) { logger.trace("starting backupActivation"); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index c03fd19..a4d8db2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation { ReplicationFailureListener listener = new ReplicationFailureListener(); rc.addCloseListener(listener); rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory()); + replicationManager = new ReplicationManager(activeMQServer, rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory()); replicationManager.start(); Thread t = new Thread(new Runnable() { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicaTimeoutTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicaTimeoutTest.java new file mode 100644 index 0000000..c1ba9d3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicaTimeoutTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.cluster.failover; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; +import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; +import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ReplicaTimeoutTest extends ActiveMQTestBase { + + protected ServerLocator locator; + + protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress"); + + @Before + public void setup() { + locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false))).setRetryInterval(50); + } + + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMAcceptor(live); + } + + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMConnector(live); + } + + protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) { + return new InVMNodeManager(true, backupConfig.getJournalLocation()); + } + + protected TestableServer createTestableServer(Configuration config, NodeManager nodeManager) throws Exception { + boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration; + return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1)); + } + + protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, + int topologyMembers) throws Exception { + CountDownLatch countDownLatch = new CountDownLatch(topologyMembers); + + locator.addClusterTopologyListener(new FailoverTestBase.LatchClusterTopologyListener(countDownLatch)); + + ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory(); + addSessionFactory(sf); + + Assert.assertTrue("topology members expected " + topologyMembers, countDownLatch.await(5, TimeUnit.SECONDS)); + return sf; + } + + protected ClientSessionFactoryInternal createSessionFactory() throws Exception { + locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(300).setRetryInterval(100); + + return createSessionFactoryAndWaitForTopology(locator, 2); + } + + protected ClientSession createSession(ClientSessionFactory sf1, + boolean autoCommitSends, + boolean autoCommitAcks) throws Exception { + return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks)); + } + + protected void crash(TestableServer liveServer, + TestableServer backupServer, + ClientSession... sessions) throws Exception { + if (sessions.length > 0) { + for (ClientSession session : sessions) { + waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer()); + } + } else { + waitForRemoteBackup(null, 5, true, backupServer.getServer()); + } + liveServer.crash(true, true, sessions); + } + + @Test//(timeout = 120000) + public void testFailbackTimeout() throws Exception { + AssertionLoggerHandler.startCapture(); + try { + TestableServer backupServer = null; + TestableServer liveServer = null; + ClientSessionFactory sf = null; + try { + final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); + final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); + final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false); + + Configuration backupConfig = createDefaultInVMConfig(); + Configuration liveConfig = createDefaultInVMConfig(); + + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null); + ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000); + + backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)). + setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false); + liveConfig.setBindingsDirectory(getBindingsDir(0, false)).setJournalDirectory(getJournalDir(0, false)). + setPagingDirectory(getPageDir(0, false)).setLargeMessagesDirectory(getLargeMessagesDir(0, false)).setSecurityEnabled(false); + + ((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false); + + NodeManager nodeManager = createReplicatedBackupNodeManager(backupConfig); + + backupServer = createTestableServer(backupConfig, nodeManager); + + liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)); + + liveServer = createTestableServer(liveConfig, nodeManager); + + AtomicBoolean ignoreIntercept = new AtomicBoolean(false); + + final TestableServer theBackup = backupServer; + + liveServer.start(); + backupServer.start(); + + Wait.assertTrue(backupServer.getServer()::isReplicaSync); + + sf = createSessionFactory(); + + ClientSession session = createSession(sf, true, true); + + session.createQueue(ADDRESS, ADDRESS, null, true); + + crash(liveServer, backupServer, session); + + Wait.assertTrue(backupServer.getServer()::isActive); + + ignoreIntercept.set(true); + + ((ActiveMQServerImpl) backupServer.getServer()).setAfterActivationCreated(new Runnable() { + @Override + public void run() { + //theBackup.getServer().getActivation() + + SharedNothingBackupActivation activation = (SharedNothingBackupActivation) theBackup.getServer().getActivation(); + activation.getReplicationEndpoint().addOutgoingInterceptorForReplication(new Interceptor() { + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + if (ignoreIntercept.get() && packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) { + return false; + } + return true; + } + }); + } + }); + + liveServer.start(); + + Assert.assertTrue(Wait.waitFor(() -> AssertionLoggerHandler.findText("AMQ229114"))); + + Wait.assertFalse(liveServer.getServer()::isStarted); + + } finally { + if (sf != null) { + sf.close(); + } + try { + liveServer.getServer().stop(); + } catch (Throwable ignored) { + } + try { + backupServer.getServer().stop(); + } catch (Throwable ignored) { + } + } + } finally { + AssertionLoggerHandler.stopCapture(); + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index c27ea68..1c89425 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase { setupServer(false); try { ClientSessionFactory sf = createSessionFactory(locator); - manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory); + manager = new ReplicationManager(null, (CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory); addActiveMQComponent(manager); manager.start(); Assert.fail("Exception was expected");