Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1405
Change subject: PREVIEW: Introduce Strategy Based Replication
......................................................................
PREVIEW: Introduce Strategy Based Replication
This is just a preview code to discuss the design.
Change-Id: I1d1012f5541ce786f127866efefb9f3db434fedd
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/cluster.xml
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
A
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
M
asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
24 files changed, 1,328 insertions(+), 640 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/05/1405/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index bdce0ca..e66b62c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -30,8 +30,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.config.PropertiesAccessor;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.PropertiesAccessor;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint;
import org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint;
@@ -259,7 +259,7 @@
});
System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
"asterix-build-configuration.xml");
- init(cleanupOnStart);
+ init(true);
while (true) {
Thread.sleep(10000);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
index 02be70b..89b25d7 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java
@@ -67,7 +67,7 @@
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
PrintWriter responseWriter = response.getWriter();
- JSONObject json;
+ JSONObject json = null;
try {
switch (request.getPathInfo() == null ? "" :
request.getPathInfo()) {
@@ -80,11 +80,19 @@
case "/summary":
json = getClusterStateSummaryJSON();
break;
+ case "/kill":
+
ClusterStateManager.INSTANCE.removeNCConfiguration("asterix_nc1");
+ break;
+ case "/join":
+
ClusterStateManager.INSTANCE.addNCConfiguration("asterix_nc1", null);
+ break;
default:
throw new IllegalArgumentException();
}
response.setStatus(HttpServletResponse.SC_OK);
- responseWriter.write(json.toString(4));
+ if (json != null) {
+ responseWriter.write(json.toString(4));
+ }
} catch (IllegalArgumentException e) { // NOSONAR - exception not
logged or rethrown
response.sendError(HttpServletResponse.SC_NOT_FOUND);
} catch (Exception e) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b1ca062..aecf235 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -29,26 +29,26 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery;
-import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.BuildProperties;
-import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.AsterixExtension;
+import org.apache.asterix.common.config.BuildProperties;
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.CompilerProperties;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.config.ExternalProperties;
import org.apache.asterix.common.config.FeedProperties;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MessagingProperties;
import org.apache.asterix.common.config.MetadataProperties;
import org.apache.asterix.common.config.PropertiesAccessor;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.MessagingProperties;
-import org.apache.asterix.common.context.FileMapManager;
import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.FileMapManager;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
@@ -138,12 +138,11 @@
private final NCExtensionManager ncExtensionManager;
public NCAppRuntimeContext(INCApplicationContext ncApplicationContext,
List<AsterixExtension> extensions)
- throws AsterixException, InstantiationException,
IllegalAccessException,
- ClassNotFoundException, IOException {
+ throws AsterixException, InstantiationException,
IllegalAccessException, ClassNotFoundException,
+ IOException {
List<AsterixExtension> allExtensions = new ArrayList<>();
this.ncApplicationContext = ncApplicationContext;
- PropertiesAccessor propertiesAccessor =
-
PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
+ PropertiesAccessor propertiesAccessor =
PropertiesAccessor.getInstance(ncApplicationContext.getAppConfig());
compilerProperties = new CompilerProperties(propertiesAccessor);
externalProperties = new ExternalProperties(propertiesAccessor);
metadataProperties = new MetadataProperties(propertiesAccessor);
@@ -180,15 +179,13 @@
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
- ILocalResourceRepositoryFactory
persistentLocalResourceRepositoryFactory =
- new PersistentLocalResourceRepositoryFactory(
- ioManager, ncApplicationContext.getNodeId(),
metadataProperties);
+ ILocalResourceRepositoryFactory
persistentLocalResourceRepositoryFactory = new
PersistentLocalResourceRepositoryFactory(
+ ioManager, ncApplicationContext.getNodeId(),
metadataProperties);
localResourceRepository = (PersistentLocalResourceRepository)
persistentLocalResourceRepositoryFactory
.createRepository();
- IAppRuntimeContextProvider asterixAppRuntimeContextProvider =
- new AppRuntimeContextProviderForRecovery(this);
+ IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new
AppRuntimeContextProviderForRecovery(this);
txnSubsystem = new
TransactionSubsystem(ncApplicationContext.getNodeId(),
asterixAppRuntimeContextProvider,
txnProperties);
@@ -208,6 +205,7 @@
activeManager = new ActiveManager(threadExecutor,
ncApplicationContext.getNodeId(),
feedProperties.getMemoryComponentGlobalBudget(),
compilerProperties.getFrameSize());
+ //TODO make it participant based instead of flag
if (ClusterProperties.INSTANCE.isReplicationEnabled()) {
String nodeId = ncApplicationContext.getNodeId();
@@ -227,10 +225,8 @@
* add the partitions that will be replicated in this node as
inactive partitions
*/
//get nodes which replicate to this node
- Set<String> replicationClients =
replicationProperties.getNodeReplicationClients(nodeId);
- //remove the node itself
- replicationClients.remove(nodeId);
- for (String clientId : replicationClients) {
+ Set<String> remoteReplicas =
replicationProperties.getRemoteReplicasIds(nodeId);
+ for (String clientId : remoteReplicas) {
//get the partitions of each client
ClusterPartition[] clientPartitions =
metadataProperties.getNodePartitions().get(clientId);
for (ClusterPartition partition : clientPartitions) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 8998c6b..7039982 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -25,18 +25,21 @@
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.MessagingProperties;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -62,13 +65,11 @@
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
private static final Logger LOGGER =
Logger.getLogger(NCApplicationEntryPoint.class.getName());
- @Option(name = "-initial-run",
- usage = "A flag indicating if it's the first time the NC is
started (default: false)", required = false)
+ @Option(name = "-initial-run", usage = "A flag indicating if it's the
first time the NC is started (default: false)", required = false)
public boolean initialRun = false;
- @Option(name = "-virtual-NC",
- usage = "A flag indicating if this NC is running on virtual
cluster " + "(default: false)",
- required = false)
+ @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is
running on virtual cluster "
+ + "(default: false)", required = false)
public boolean virtualNC = false;
private INCApplicationContext ncApplicationContext = null;
@@ -91,8 +92,8 @@
parser.printUsage(System.err);
throw e;
}
- ncAppCtx.setThreadFactory(new
AsterixThreadFactory(ncAppCtx.getThreadFactory(),
- ncAppCtx.getLifeCycleComponentManager()));
+ ncAppCtx.setThreadFactory(
+ new AsterixThreadFactory(ncAppCtx.getThreadFactory(),
ncAppCtx.getLifeCycleComponentManager()));
ncApplicationContext = ncAppCtx;
nodeId = ncApplicationContext.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
@@ -102,12 +103,11 @@
final NodeControllerService controllerService =
(NodeControllerService) ncAppCtx.getControllerService();
if (System.getProperty("java.rmi.server.hostname") == null) {
- System.setProperty("java.rmi.server.hostname", (controllerService)
- .getConfiguration().clusterNetPublicIPAddress);
+ System.setProperty("java.rmi.server.hostname",
+
(controllerService).getConfiguration().clusterNetPublicIPAddress);
}
runtimeContext = new NCAppRuntimeContext(ncApplicationContext,
getExtensions());
- MetadataProperties metadataProperties = ((IPropertiesProvider)
runtimeContext)
- .getMetadataProperties();
+ MetadataProperties metadataProperties = ((IPropertiesProvider)
runtimeContext).getMetadataProperties();
if
(!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId()))
{
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Substitute node joining : " +
ncApplicationContext.getNodeId());
@@ -116,8 +116,7 @@
}
runtimeContext.initialize(initialRun);
ncApplicationContext.setApplicationObject(runtimeContext);
- MessagingProperties messagingProperties = ((IPropertiesProvider)
runtimeContext)
- .getMessagingProperties();
+ MessagingProperties messagingProperties = ((IPropertiesProvider)
runtimeContext).getMessagingProperties();
messageBroker = new NCMessageBroker(controllerService,
messagingProperties);
ncApplicationContext.setMessageBroker(messageBroker);
MessagingChannelInterfaceFactory interfaceFactory = new
MessagingChannelInterfaceFactory(
@@ -169,14 +168,22 @@
}
private void startReplicationService() throws InterruptedException {
+ ReplicationProperties replicationProperties = ((IPropertiesProvider)
runtimeContext).getReplicationProperties();
//Open replication channel
- runtimeContext.getReplicationChannel().start();
+ if
(replicationProperties.getReplicationStrategy().isParticipant(nodeId)) {
+ if (nodeId.equals("asterix_nc2")) {
+ System.out.println("Started on " + nodeId + ": Remote Primary
Replicas: "
+ +
replicationProperties.getReplicationStrategy().getRemotePrimaryReplicas(nodeId).stream()
+
.map(Replica::getId).collect(Collectors.toSet()));
+ }
+ runtimeContext.getReplicationChannel().start();
- //Check the state of remote replicas
- runtimeContext.getReplicationManager().initializeReplicasState();
+ //Check the state of remote replicas
+ runtimeContext.getReplicationManager().initializeReplicasState();
- //Start replication after the state of remote replicas has been
initialized.
- runtimeContext.getReplicationManager().startReplicationThreads();
+ //Start replication after the state of remote replicas has been
initialized.
+ runtimeContext.getReplicationManager().startReplicationThreads();
+ }
}
@Override
@@ -204,8 +211,7 @@
public void notifyStartupComplete() throws Exception {
//Send max resource id on this NC to the CC
ReportMaxResourceIdMessage.send((NodeControllerService)
ncApplicationContext.getControllerService());
- MetadataProperties metadataProperties = ((IPropertiesProvider)
runtimeContext)
- .getMetadataProperties();
+ MetadataProperties metadataProperties = ((IPropertiesProvider)
runtimeContext).getMetadataProperties();
if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
@@ -214,9 +220,8 @@
LOGGER.info("Root Metadata Store: " +
metadataProperties.getStores().get(nodeId)[0]);
}
- PersistentLocalResourceRepository localResourceRepository =
- (PersistentLocalResourceRepository) runtimeContext
- .getLocalResourceRepository();
+ PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) runtimeContext
+ .getLocalResourceRepository();
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
}
@@ -281,8 +286,7 @@
}
private void updateOnNodeJoin() {
- MetadataProperties metadataProperties = ((IPropertiesProvider)
runtimeContext)
- .getMetadataProperties();
+ MetadataProperties metadataProperties = ((IPropertiesProvider)
runtimeContext).getMetadataProperties();
if (!metadataProperties.getNodeNames().contains(nodeId)) {
metadataProperties.getNodeNames().add(nodeId);
Cluster cluster = ClusterProperties.INSTANCE.getCluster();
@@ -290,8 +294,7 @@
throw new IllegalStateException("No cluster configuration
found for this instance");
}
String asterixInstanceName = metadataProperties.getInstanceName();
- TransactionProperties txnProperties = ((IPropertiesProvider)
runtimeContext)
- .getTransactionProperties();
+ TransactionProperties txnProperties = ((IPropertiesProvider)
runtimeContext).getTransactionProperties();
Node self = null;
List<Node> nodes;
if (cluster.getSubstituteNodes() != null) {
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml
b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 8f0b694..3173b57 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -21,10 +21,10 @@
<store>storage</store>
<data_replication>
- <enabled>false</enabled>
+ <enabled>true</enabled>
<replication_port>2016</replication_port>
<replication_factor>2</replication_factor>
- <auto_failover>false</auto_failover>
+ <auto_failover>true</auto_failover>
<replication_time_out>30</replication_time_out>
</data_replication>
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
new file mode 100644
index 0000000..fb98e1d
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import java.util.Map;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClusterStateManager {
+
+ /**
+ * @return The current cluster state.
+ */
+ ClusterState getState();
+
+ /**
+ * Updates the cluster state based on the state of all cluster partitions
and the metadata node.
+ */
+ void updateClusterState() throws HyracksDataException;
+
+ /**
+ * Force the cluster state into {@code state}
+ */
+ void forceIntoState(ClusterState state);
+
+ /**
+ * Updates all partitions of {@code nodeId} based on the {@code active}
flag.
+ * @param nodeId
+ * @param active
+ * @throws HyracksDataException
+ */
+ void updateNodePartitions(String nodeId, boolean active) throws
HyracksDataException;
+
+ /**
+ * Updates the active node and active state of the cluster partition with
id {@code partitionNum}
+ */
+ void updateClusterPartition(Integer partitionNum, String activeNode,
boolean active);
+
+ /**
+ * Updates the metadata node id and its state.
+ */
+ void updateMetadataNode(String nodeId, boolean active);
+
+ /**
+ * @return a map of nodeId and NC Configuration for active nodes.
+ */
+ Map<String, Map<String, String>> getActiveNcConfiguration();
+
+ /**
+ * @return The current metadata node Id.
+ */
+ String getCurrentMetadataNodeId();
+
+ ClusterPartition[] getNodePartitions(String nodeId);
+
+ ClusterPartition[] getClusterPartitons();
+
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 81c5a6d..85b24f6 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -25,6 +25,7 @@
import javax.xml.bind.Unmarshaller;
import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.commons.lang3.StringUtils;
public class ClusterProperties {
@@ -32,7 +33,7 @@
private static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
private static final String DEFAULT_STORAGE_DIR_NAME = "storage";
-
+ private static String NODE_NAME_PREFIX;
private final Cluster cluster;
private ClusterProperties() {
@@ -42,11 +43,13 @@
JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
Unmarshaller unmarshaller = ctx.createUnmarshaller();
cluster = (Cluster) unmarshaller.unmarshal(is);
+ NODE_NAME_PREFIX = cluster.getInstanceName() + "_";
} catch (JAXBException e) {
throw new IllegalStateException("Failed to read configuration
file " + CLUSTER_CONFIGURATION_FILE, e);
}
} else {
cluster = null;
+ NODE_NAME_PREFIX = StringUtils.EMPTY;
}
}
@@ -72,4 +75,11 @@
public boolean isAutoFailoverEnabled() {
return isReplicationEnabled() &&
cluster.getDataReplication().isAutoFailover();
}
+
+ public String getNodeFullName(String nodeId) {
+ if (nodeId.startsWith(NODE_NAME_PREFIX)) {
+ return nodeId;
+ }
+ return NODE_NAME_PREFIX + nodeId;
+ }
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
index 164a525..78c5114 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ReplicationProperties.java
@@ -18,11 +18,12 @@
*/
package org.apache.asterix.common.config;
-import java.util.HashSet;
import java.util.Set;
-import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import
org.apache.asterix.common.replication.ChainedDeclusteringReplicationStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
@@ -32,7 +33,6 @@
public class ReplicationProperties extends AbstractProperties {
private static final Logger LOGGER =
Logger.getLogger(ReplicationProperties.class.getName());
-
private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
@@ -44,8 +44,7 @@
private static final String REPLICATION_TIMEOUT_KEY =
"replication.timeout";
private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
- private static final String REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY =
- "replication.max.remote.recovery.attempts";
+ private static final String REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY =
"replication.max.remote.recovery.attempts";
private static final int MAX_REMOTE_RECOVERY_ATTEMPTS = 5;
private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
@@ -60,20 +59,14 @@
private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT =
StorageUtil.getSizeInBytes(128,
StorageUnit.KILOBYTE);
- private final String nodeNamePrefix;
private final Cluster cluster;
public ReplicationProperties(PropertiesAccessor accessor) {
super(accessor);
this.cluster = ClusterProperties.INSTANCE.getCluster();
-
- if (cluster != null) {
- nodeNamePrefix = cluster.getInstanceName() + "_";
- } else {
- nodeNamePrefix = "";
- }
}
+ //TODO remove this unused method
@PropertyKey(REPLICATION_ENABLED_KEY)
public boolean isReplicationEnabled() {
return ClusterProperties.INSTANCE.isReplicationEnabled();
@@ -83,7 +76,7 @@
if (cluster != null) {
for (int i = 0; i < cluster.getNode().size(); i++) {
Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ if
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
return node.getClusterIp();
}
}
@@ -95,7 +88,7 @@
if (cluster != null && cluster.getDataReplication() != null) {
for (int i = 0; i < cluster.getNode().size(); i++) {
Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ if
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
return node.getReplicationPort() != null ?
node.getReplicationPort().intValue()
:
cluster.getDataReplication().getReplicationPort().intValue();
}
@@ -105,52 +98,13 @@
}
public Set<Replica> getRemoteReplicas(String nodeId) {
- Set<Replica> remoteReplicas = new HashSet<>();;
-
- int numberOfRemoteReplicas = getReplicationFactor() - 1;
- //Using chained-declustering
- if (cluster != null) {
- int nodeIndex = -1;
- //find the node index in the cluster config
- for (int i = 0; i < cluster.getNode().size(); i++) {
- Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
- nodeIndex = i;
- break;
- }
- }
-
- if (nodeIndex == -1) {
- LOGGER.log(Level.WARNING,
- "Could not find node " + getRealCluserNodeID(nodeId) +
" in cluster configurations");
- return null;
- }
-
- //find nodes to the right of this node
- for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
- remoteReplicas.add(getReplicaByNodeIndex(i));
- if (remoteReplicas.size() == numberOfRemoteReplicas) {
- break;
- }
- }
-
- //if not all remote replicas have been found, start from the
beginning
- if (remoteReplicas.size() != numberOfRemoteReplicas) {
- for (int i = 0; i < cluster.getNode().size(); i++) {
- remoteReplicas.add(getReplicaByNodeIndex(i));
- if (remoteReplicas.size() == numberOfRemoteReplicas) {
- break;
- }
- }
- }
- }
- return remoteReplicas;
+ return getReplicationStrategy().getRemoteReplicas(nodeId);
}
private Replica getReplicaByNodeIndex(int nodeIndex) {
Node node = cluster.getNode().get(nodeIndex);
Node replicaNode = new Node();
- replicaNode.setId(getRealCluserNodeID(node.getId()));
+
replicaNode.setId(ClusterProperties.INSTANCE.getNodeFullName(node.getId()));
replicaNode.setClusterIp(node.getClusterIp());
return new Replica(replicaNode);
}
@@ -161,7 +115,7 @@
for (int i = 0; i < cluster.getNode().size(); i++) {
Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ if
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
nodeIndex = i;
break;
}
@@ -176,25 +130,15 @@
}
public Set<String> getRemoteReplicasIds(String nodeId) {
- Set<String> remoteReplicasIds = new HashSet<>();
- Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
-
- for (Replica replica : remoteReplicas) {
- remoteReplicasIds.add(replica.getId());
- }
-
- return remoteReplicasIds;
- }
-
- public String getRealCluserNodeID(String nodeId) {
- return nodeNamePrefix + nodeId;
+ return
getReplicationStrategy().getRemoteReplicas(nodeId).stream().map(Replica::getId)
+ .collect(Collectors.toSet());
}
public Set<String> getNodeReplicasIds(String nodeId) {
- Set<String> replicaIds = new HashSet<>();
- replicaIds.add(nodeId);
- replicaIds.addAll(getRemoteReplicasIds(nodeId));
- return replicaIds;
+ Set<String> remoteReplicasIds = getRemoteReplicasIds(nodeId);
+ // This includes the node itself
+ remoteReplicasIds.add(nodeId);
+ return remoteReplicasIds;
}
@PropertyKey(REPLICATION_FACTOR_KEY)
@@ -214,49 +158,6 @@
return
cluster.getDataReplication().getReplicationTimeOut().intValue();
}
return REPLICATION_TIME_OUT_DEFAULT;
- }
-
- /**
- * @param nodeId
- * @return The set of nodes which replicate to this node, including the
node itself
- */
- public Set<String> getNodeReplicationClients(String nodeId) {
- Set<String> clientReplicas = new HashSet<>();
- clientReplicas.add(nodeId);
-
- int clientsCount = getReplicationFactor();
-
- //Using chained-declustering backwards
- if (cluster != null) {
- int nodeIndex = -1;
- //find the node index in the cluster config
- for (int i = 0; i < cluster.getNode().size(); i++) {
- Node node = cluster.getNode().get(i);
- if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
- nodeIndex = i;
- break;
- }
- }
-
- //find nodes to the left of this node
- for (int i = nodeIndex - 1; i >= 0; i--) {
- clientReplicas.add(getReplicaByNodeIndex(i).getId());
- if (clientReplicas.size() == clientsCount) {
- break;
- }
- }
-
- //if not all client replicas have been found, start from the end
- if (clientReplicas.size() != clientsCount) {
- for (int i = cluster.getNode().size() - 1; i >= 0; i--) {
- clientReplicas.add(getReplicaByNodeIndex(i).getId());
- if (clientReplicas.size() == clientsCount) {
- break;
- }
- }
- }
- }
- return clientReplicas;
}
@PropertyKey(REPLICATION_MAX_REMOTE_RECOVERY_ATTEMPTS_KEY)
@@ -281,4 +182,8 @@
return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY,
REPLICATION_LOG_BATCH_SIZE_DEFAULT,
PropertyInterpreters.getIntegerBytePropertyInterpreter());
}
+
+ public IReplicationStrategy getReplicationStrategy() {
+ return new
ChainedDeclusteringReplicationStrategy(getReplicationFactor());
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
new file mode 100644
index 0000000..5bc4585
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AbstractReplicationStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public abstract class AbstractReplicationStrategy implements
IReplicationStrategy {
+
+ public Replica getReplicaByNodeIndex(int nodeIndex) {
+ Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+ Node node = cluster.getNode().get(nodeIndex);
+ Node replicaNode = new Node();
+
replicaNode.setId(ClusterProperties.INSTANCE.getNodeFullName(node.getId()));
+ replicaNode.setClusterIp(node.getClusterIp());
+ return new Replica(replicaNode);
+ }
+
+ @Override
+ public boolean isParticipant(String nodeId) {
+ return !getRemoteReplicas(nodeId).isEmpty() ||
!getRemotePrimaryReplicas(nodeId).isEmpty();
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
new file mode 100644
index 0000000..bd6f842
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ChainedDeclusteringReplicationStrategy.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class ChainedDeclusteringReplicationStrategy extends
AbstractReplicationStrategy {
+
+ private static final Logger LOGGER =
Logger.getLogger(ChainedDeclusteringReplicationStrategy.class.getName());
+ private final int replicationFactor;
+
+ public ChainedDeclusteringReplicationStrategy(int replicationFactor) {
+ this.replicationFactor = replicationFactor;
+ }
+
+ @Override
+ public boolean isMatch(ILogRecord logRecord) {
+ return logRecord.getLogSource() == LogSource.LOCAL &&
logRecord.getLogType() != LogType.WAIT;
+ }
+
+ @Override
+ public boolean isMatch(String filePath) {
+ return true;
+ }
+
+ @Override
+ public Set<Replica> getRemoteReplicas(String nodeId) {
+ Set<Replica> remoteReplicas = new HashSet<>();
+ Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+ int numberOfRemoteReplicas = replicationFactor - 1;
+
+ //Using chained-declustering
+ int nodeIndex = -1;
+ String fullNodeID = ClusterProperties.INSTANCE.getNodeFullName(nodeId);
+ //find the node index in the cluster config
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+ if
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(fullNodeID)) {
+ nodeIndex = i;
+ break;
+ }
+ }
+
+ if (nodeIndex == -1) {
+ LOGGER.log(Level.WARNING, "Could not find node " + fullNodeID + "
in cluster configurations");
+ return Collections.emptySet();
+ }
+
+ //find nodes to the right of this node
+ for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
+ remoteReplicas.add(getReplicaByNodeIndex(i %
cluster.getNode().size()));
+ if (remoteReplicas.size() == numberOfRemoteReplicas) {
+ break;
+ }
+ }
+
+ return remoteReplicas;
+ }
+
+ @Override
+ public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+ Set<Replica> clientReplicas = new HashSet<>();
+ Cluster cluster = ClusterProperties.INSTANCE.getCluster();
+ final int remotePrimaryReplicasCount = replicationFactor - 1;
+
+ //Using chained-declustering backwards
+ int nodeIndex = -1;
+ //find the node index in the cluster config
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+ if
(ClusterProperties.INSTANCE.getNodeFullName(node.getId()).equals(nodeId)) {
+ nodeIndex = i;
+ break;
+ }
+ }
+
+ //find nodes to the left of this node
+ for (int i = nodeIndex - 1; i >= (-cluster.getNode().size()); i--) {
+ clientReplicas.add(getReplicaByNodeIndex(i %
cluster.getNode().size()));
+ if (clientReplicas.size() == remotePrimaryReplicasCount) {
+ break;
+ }
+ }
+
+ return clientReplicas;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
new file mode 100644
index 0000000..1d03016
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IFaultToleranceStrategy {
+
+ /**
+ * Defines the logic of a {@link IFaultToleranceStrategy} when a node
joins the cluster.
+ * @param nodeId
+ * @throws HyracksDataException
+ */
+ void notifyNodeJoin(String nodeId) throws HyracksDataException;
+
+ /**
+ * Defines the logic of a {@link IFaultToleranceStrategy} when a node
leaves the cluster.
+ * @param nodeId
+ * @throws HyracksDataException
+ */
+ void notifyNodeFailure(String nodeId) throws HyracksDataException;
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
new file mode 100644
index 0000000..0bafed6
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.asterix.common.transactions.ILogRecord;
+
+public interface IReplicationStrategy {
+
+ /**
+ * @param logRecord
+ * @return true, if {@code logRecord} should be replicated. Otherwise
false.
+ */
+ boolean isMatch(ILogRecord logRecord);
+
+ /**
+ * @param file
+ * @return true, if {@code file} should be replicated. Otherwise false.
+ */
+ boolean isMatch(String file);
+
+ /**
+ * @param nodeId
+ * @return The set of nodes that replicate data on {@code nodeId}.
+ */
+ Set<Replica> getRemotePrimaryReplicas(String nodeId);
+
+ /**
+ * @param node
+ * @return The set of nodes that {@code nodeId} replicates data to.
+ */
+ Set<Replica> getRemoteReplicas(String node);
+
+ /**
+ * @param nodeId
+ * @return true if {@code nodeId} has any remote primary replica or remote
replica. Otherwise false.
+ */
+ boolean isParticipant(String nodeId);
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
new file mode 100644
index 0000000..60c30cf
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataNodeFaultToleranceStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MetadataNodeFaultToleranceStrategy implements
IFaultToleranceStrategy {
+
+ @Override
+ public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
new file mode 100644
index 0000000..2606f1e
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
+
+public class MetadataOnlyReplicationStrategy extends
AbstractReplicationStrategy {
+
+ private static final Logger LOGGER =
Logger.getLogger(MetadataOnlyReplicationStrategy.class.getName());
+
+ @Override
+ public boolean isMatch(ILogRecord logRecord) {
+ boolean match = logRecord.getLogSource() == LogSource.LOCAL &&
logRecord.getLogType() != LogType.WAIT;
+ if (match) {
+ switch (logRecord.getLogType()) {
+ case LogType.ENTITY_COMMIT:
+ case LogType.UPSERT_ENTITY_COMMIT:
+ case LogType.UPDATE:
+ case LogType.FLUSH:
+ return logRecord.getDatasetId() <
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
+ default:
+ return false;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isMatch(String filePath) {
+ return filePath.contains("/Metadata");
+ }
+
+ @Override
+ public Set<Replica> getRemoteReplicas(String nodeId) {
+ Set<Replica> remoteReplicas = new HashSet<>();
+ if (nodeId.equals("asterix_nc1")) {
+ remoteReplicas.add(getReplicaByNodeIndex(1));
+ }
+ return remoteReplicas;
+ }
+
+ @Override
+ public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+ Set<Replica> remoteReplicas = new HashSet<>();
+ if (nodeId.equals("asterix_nc2")) {
+ remoteReplicas.add(getReplicaByNodeIndex(0));
+ }
+ return remoteReplicas;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
new file mode 100644
index 0000000..df29144
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoFaultToleranceStrategy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+ @Override
+ public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
new file mode 100644
index 0000000..40caf3d
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/NoReplicationStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.asterix.common.transactions.ILogRecord;
+
+public class NoReplicationStrategy implements IReplicationStrategy {
+
+ @Override
+ public boolean isMatch(ILogRecord logRecord) {
+ return false;
+ }
+
+ @Override
+ public boolean isMatch(String filePath) {
+ return false;
+ }
+
+ @Override
+ public boolean isParticipant(String nodeId) {
+ return false;
+ }
+
+ @Override
+ public Set<Replica> getRemotePrimaryReplicas(String nodeId) {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<Replica> getRemoteReplicas(String node) {
+ return Collections.emptySet();
+ }
+}
diff --git
a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
index 0f6b528..9de0f04 100644
---
a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
+++
b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
@@ -17,7 +17,7 @@
! under the License.
!-->
<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
- <test-group name="failover">
+<!-- <test-group name="failover">
<test-case FilePath="failover">
<compilation-unit name="bulkload">
<output-dir compare="Text">bulkload</output-dir>
@@ -33,7 +33,7 @@
<output-dir compare="Text">metadata_node</output-dir>
</compilation-unit>
</test-case>
- </test-group>
+ </test-group> -->
<test-group name="failback">
<test-case FilePath="failback">
<compilation-unit name="node_failback">
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 529a660..368622c 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -44,8 +44,8 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -122,7 +122,7 @@
Map<String, ClusterPartition[]> nodePartitions =
((IPropertiesProvider)
asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
.getNodePartitions();
- Set<String> nodeReplicationClients =
replicationProperties.getNodeReplicationClients(nodeId);
+ Set<String> nodeReplicationClients =
replicationProperties.getNodeReplicasIds(nodeId);
List<Integer> clientsPartitions = new ArrayList<>();
for (String clientId : nodeReplicationClients) {
for (ClusterPartition clusterPartition :
nodePartitions.get(clientId)) {
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index dc4a93a..59b1d60 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
+import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -50,18 +51,20 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
-import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.Replica.ReplicaState;
import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
@@ -103,7 +106,7 @@
private final Map<Integer, Set<String>> jobCommitAcks;
private final Map<Integer, ILogRecord> replicationJobsPendingAcks;
private ByteBuffer dataBuffer;
-
+ private static final boolean DEBUG = false;
private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ;
private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ;
@@ -124,8 +127,8 @@
private ReplicationJobsProccessor replicationJobsProcessor;
private final ReplicasEventsMonitor replicationMonitor;
//dummy job used to stop ReplicationJobsProccessor thread.
- private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new
ReplicationJob(
- ReplicationJobType.METADATA, ReplicationOperation.REPLICATE,
ReplicationExecutionType.ASYNC, null);
+ private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new
ReplicationJob(ReplicationJobType.METADATA,
+ ReplicationOperation.REPLICATE, ReplicationExecutionType.ASYNC,
null);
//used to identify the correct IP address when the node has multiple
network interfaces
private String hostIPAddressFirstOctet = null;
@@ -136,6 +139,7 @@
private Future<? extends Object> txnLogReplicatorTask;
private SocketChannel[] logsRepSockets;
private final ByteBuffer txnLogsBatchSizeBuffer =
ByteBuffer.allocate(Integer.BYTES);
+ private final IReplicationStrategy replicationStrategy;
//TODO this class needs to be refactored by moving its private classes to
separate files
//and possibly using MessageBroker to send/receive remote replicas events.
@@ -144,35 +148,36 @@
IAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
this.nodeId = nodeId;
this.replicationProperties = replicationProperties;
+ replicationStrategy = replicationProperties.getReplicationStrategy();
this.replicaResourcesManager = (ReplicaResourcesManager)
remoteResoucesManager;
this.asterixAppRuntimeContextProvider =
asterixAppRuntimeContextProvider;
- this.hostIPAddressFirstOctet =
replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
this.logManager = logManager;
+ this.hostIPAddressFirstOctet =
replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
+ replicas = new HashMap<>();
replicationJobsQ = new LinkedBlockingQueue<>();
replicaEventsQ = new LinkedBlockingQueue<>();
terminateJobsReplication = new AtomicBoolean(false);
jobsReplicationSuspended = new AtomicBoolean(true);
replicationSuspended = new AtomicBoolean(true);
- replicas = new HashMap<>();
jobCommitAcks = new ConcurrentHashMap<>();
replicationJobsPendingAcks = new ConcurrentHashMap<>();
shuttingDownReplicaIds = new HashSet<>();
dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+ replicationMonitor = new ReplicasEventsMonitor();
+ //add list of replicas from configurations (To be read from another
source e.g. Zookeeper)
+ Set<Replica> replicaNodes =
replicationProperties.getReplicationStrategy().getRemoteReplicas(nodeId);
//Used as async listeners from replicas
replicationListenerThreads = Executors.newCachedThreadPool();
replicationJobsProcessor = new ReplicationJobsProccessor();
- replicationMonitor = new ReplicasEventsMonitor();
Map<String, ClusterPartition[]> nodePartitions =
((IPropertiesProvider) asterixAppRuntimeContextProvider
.getAppContext()).getMetadataProperties().getNodePartitions();
- //add list of replicas from configurations (To be read from another
source e.g. Zookeeper)
- Set<Replica> replicaNodes =
replicationProperties.getRemoteReplicas(nodeId);
replica2PartitionsMap = new HashMap<>(replicaNodes.size());
for (Replica replica : replicaNodes) {
- replicas.put(replica.getNode().getId(), replica);
+
replicas.put(ClusterProperties.INSTANCE.getNodeFullName(replica.getNode().getId()),
replica);
//for each remote replica, get the list of replication clients
- Set<String> nodeReplicationClients =
replicationProperties.getNodeReplicationClients(replica.getId());
+ Set<String> nodeReplicationClients =
replicationProperties.getNodeReplicasIds(replica.getId());
//get the partitions of each client
List<Integer> clientPartitions = new ArrayList<>();
for (String clientId : nodeReplicationClients) {
@@ -278,8 +283,13 @@
//all of the job's files belong to a single storage partition.
//get any of them to determine the partition from the file path.
String jobFile = job.getJobFiles().iterator().next();
+ if (!replicationStrategy.isMatch(jobFile)) {
+ return;
+ }
+
int jobPartitionId =
PersistentLocalResourceRepository.getResourcePartition(jobFile);
+ // TODO apply policy here
ByteBuffer responseBuffer = null;
LSMIndexFileProperties asterixFileProperties = new
LSMIndexFileProperties();
if (requestBuffer == null) {
@@ -906,25 +916,129 @@
*/
@Override
public void stop(boolean dumpState, OutputStream ouputStream) throws
IOException {
- //stop replication thread afters all jobs/logs have been processed
- suspendReplication(false);
- //send shutdown event to remote replicas
- sendShutdownNotifiction();
- //wait until all shutdown events come from all remote replicas
- synchronized (shuttingDownReplicaIds) {
- while
(!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
- try {
- shuttingDownReplicaIds.wait(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (DEBUG) {
+ try (PrintWriter writer = new PrintWriter(nodeId +
"_rep_manager.txt", "UTF-8")) {
+ writer.println("suspending Replication shutdown");
+ writer.flush();
+ //stop replication thread afters all jobs/logs have been
processed
+ suspendReplication(false);
+
+ writer.println("Replication suspendied");
+ writer.flush();
+
+ /**
+ * If this node has any remote replicas, it needs to inform
them
+ * that it is shutting down.
+ */
+ if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) {
+ writer.println(" sending shutdown to ");
+ printSet(writer,
replicationStrategy.getRemoteReplicas(nodeId));
+ //send shutdown event to remote replicas
+ sendShutdownNotifiction();
+ } else {
+ writer.println(nodeId + " no need to send shutdown...");
+ }
+ writer.flush();
+
+ /**
+ * If this node has any remote primary replicas, then it needs
to wait
+ * until all of them send the shutdown notification.
+ */
+ // Active replicas
+ Set<String> activeReplicasIds = getActiveReplicasIds();
+ Set<String> remotePrimaryReplicasIds =
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+ .map(Replica::getId).collect(Collectors.toSet());
+
+ // find active remote primary replicas
+ Set<String> activeRemotePrimaryReplicas = new HashSet<>();
+ for (String replicaId : remotePrimaryReplicasIds) {
+ if (activeReplicasIds.contains(replicaId)) {
+ activeRemotePrimaryReplicas.add(replicaId);
+ }
+ }
+ if (!activeRemotePrimaryReplicas.isEmpty()) {
+ writer.println("Waiting for shutdown from: " +
activeRemotePrimaryReplicas.toString());
+ writer.flush();
+ //wait until all shutdown events come from all remote
primary replicas
+ synchronized (shuttingDownReplicaIds) {
+ while
(!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) {
+ try {
+ shuttingDownReplicaIds.wait();
+ writer.println("Current notify: " +
shuttingDownReplicaIds.toString());
+ writer.flush();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ } else {
+ writer.println("No need to wait for shutdown
notification");
+ writer.flush();
+ }
+
+ LOGGER.log(Level.INFO, "Got shutdown notification from all
remote replicas");
+ writer.println("Got shutdown notification from all remote
replicas");
+ //close replication channel
+
asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+
+ LOGGER.log(Level.INFO, "Replication manager stopped.");
+ }
+ } else {
+ //stop replication thread afters all jobs/logs have been processed
+ suspendReplication(false);
+
+ /**
+ * If this node has any remote replicas, it needs to inform them
+ * that it is shutting down.
+ */
+ if (!replicationStrategy.getRemoteReplicas(nodeId).isEmpty()) {
+ //send shutdown event to remote replicas
+ sendShutdownNotifiction();
+ }
+
+ /**
+ * If this node has any remote primary replicas, then it needs to
wait
+ * until all of them send the shutdown notification.
+ */
+ // Active replicas
+ Set<String> activeReplicasIds = getActiveReplicasIds();
+ Set<String> remotePrimaryReplicasIds =
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+ .map(Replica::getId).collect(Collectors.toSet());
+
+ // find active remote primary replicas
+ Set<String> activeRemotePrimaryReplicas = new HashSet<>();
+ for (String replicaId : remotePrimaryReplicasIds) {
+ if (activeReplicasIds.contains(replicaId)) {
+ activeRemotePrimaryReplicas.add(replicaId);
}
}
- }
- LOGGER.log(Level.INFO, "Got shutdown notification from all remote
replicas");
- //close replication channel
-
asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+ if (!activeRemotePrimaryReplicas.isEmpty()) {
+ //wait until all shutdown events come from all remote primary
replicas
+ synchronized (shuttingDownReplicaIds) {
+ // TODO it should check for active ones only
+ while
(!shuttingDownReplicaIds.containsAll(activeRemotePrimaryReplicas)) {
+ try {
+ shuttingDownReplicaIds.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
- LOGGER.log(Level.INFO, "Replication manager stopped.");
+ LOGGER.log(Level.INFO, "Got shutdown notification from all remote
replicas");
+ //close replication channel
+
asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+
+ LOGGER.log(Level.INFO, "Replication manager stopped.");
+ }
+ }
+
+ private void printSet(PrintWriter writer, Set<Replica> replicas) {
+ replicas.forEach(e -> {
+ writer.println(e.getId());
+ });
+ writer.flush();
}
@Override
@@ -996,6 +1110,9 @@
public void requestFlushLaggingReplicaIndexes(long
nonSharpCheckpointTargetLSN) throws IOException {
long startLSN = logManager.getAppendLSN();
Set<String> replicaIds = getActiveReplicasIds();
+ if (replicaIds.isEmpty()) {
+ return;
+ }
ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
for (String replicaId : replicaIds) {
//1. identify replica indexes with LSN less than
nonSharpCheckpointTargetLSN.
@@ -1201,6 +1318,7 @@
}
public void handleReplicaFailure(String replicaId) throws
InterruptedException {
+ System.out.println("failed replica: " + replicaId);
Replica replica = replicas.get(replicaId);
if (replica.getState() == ReplicaState.DEAD) {
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 0dcdc7b..3dc0214 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -31,9 +31,9 @@
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicationManager;
@@ -61,11 +61,11 @@
//1. identify which replicas reside in this node
String localNodeId = runtimeContext.getTransactionSubsystem().getId();
- Set<String> nodes =
replicationProperties.getNodeReplicationClients(localNodeId);
-
+ Set<String> nodes =
replicationProperties.getNodeReplicasIds(localNodeId);
+ // add the node itself
Map<String, Set<String>> recoveryCandidates = new HashMap<>();
Map<String, Integer> candidatesScore = new HashMap<>();
-
+
//2. identify which nodes has backup per lost node data
for (String node : nodes) {
Set<String> locations =
replicationProperties.getNodeReplicasIds(node);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
new file mode 100644
index 0000000..2e740b0
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/replication/AutoFaultToleranceStrategy.java
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.replication;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
+import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
+import org.apache.asterix.runtime.message.NodeFailbackPlan;
+import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
+import
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
+import
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
+import org.apache.asterix.runtime.message.ReplicaEventMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.runtime.util.AppContextInfo;
+import
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
+
+ private static final Logger LOGGER =
Logger.getLogger(AutoFaultToleranceStrategy.class.getName());
+ private long clusterRequestId = 0;
+
+ private Set<String> failedNodes = new HashSet<>();
+ private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans = new
LinkedList<>();
+ private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap = new
HashMap<>();
+ private static final String CLUSTER_NET_IP_ADDRESS_KEY =
"cluster-net-ip-address";
+ private Map<Long, TakeoverPartitionsRequestMessage>
pendingTakeoverRequests = new HashMap<>();;
+
+ private final IClusterStateManager clusterManager;
+ private final ICCMessageBroker messageBroker;
+ private final IReplicationStrategy replicationStrategy;
+ private String currentMetadataNode;
+ private boolean metadataNodeActive;
+
+ public AutoFaultToleranceStrategy(IReplicationStrategy
replicationStrategy, IClusterStateManager clusterManager,
+ ICCMessageBroker messageBroker) {
+ this.clusterManager = clusterManager;
+ this.messageBroker = messageBroker;
+ this.replicationStrategy = replicationStrategy;
+ currentMetadataNode = clusterManager.getCurrentMetadataNodeId();
+ metadataNodeActive = false;
+ }
+
+ @Override
+ public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+ if (failedNodes.contains(nodeId)) {
+ prepareFailbackPlan(nodeId);
+ return;
+ }
+ //a node completed local or remote recovery and rejoined
+ failedNodes.remove(nodeId);
+
+ if (nodeId.equals(currentMetadataNode)) {
+ currentMetadataNode = nodeId;
+ metadataNodeActive = true;
+ clusterManager.updateMetadataNode(currentMetadataNode,
metadataNodeActive);
+ }
+ clusterManager.updateNodePartitions(nodeId, true);
+ validateClusterState();
+ }
+
+ @Override
+ public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+ //if this node was waiting for failback and failed before it completed
+ if (failedNodes.contains(nodeId)) {
+ notifyFailbackPlansNodeFailure(nodeId);
+ revertFailedFailbackPlanEffects();
+ } else {
+ //an active node failed
+ failedNodes.add(nodeId);
+ clusterManager.updateNodePartitions(nodeId, false);
+ if (nodeId.equals(currentMetadataNode)) {
+ metadataNodeActive = false;
+ clusterManager.updateMetadataNode(nodeId, metadataNodeActive);
+ }
+ validateClusterState();
+ notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
+ notifyFailbackPlansNodeFailure(nodeId);
+ requestPartitionsTakeover(nodeId);
+ }
+ }
+
+ private synchronized void notifyImpactedReplicas(String nodeId,
ClusterEventType event) {
+ List<String> remoteReplicas =
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId)
+ .collect(Collectors.toList());
+ String nodeIdAddress = "";
+ Map<String, Map<String, String>> activeNcConfiguration =
clusterManager.getActiveNcConfiguration();
+ //in case the node joined with a new IP address, we need to send it to
the other replicas
+ if (event == ClusterEventType.NODE_JOIN) {
+ nodeIdAddress =
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
+ }
+
+ ReplicaEventMessage msg = new ReplicaEventMessage(nodeId,
nodeIdAddress, event);
+ for (String replica : remoteReplicas) {
+ //if the remote replica is alive, send the event
+ if (activeNcConfiguration.containsKey(replica)) {
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, replica);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed sending an application
message to an NC", e);
+ }
+ }
+ }
+ }
+
+ private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
+ Iterator<NodeFailbackPlan> iterator =
planId2FailbackPlanMap.values().iterator();
+ while (iterator.hasNext()) {
+ NodeFailbackPlan plan = iterator.next();
+ plan.notifyNodeFailure(nodeId);
+ }
+ }
+
+ private synchronized void revertFailedFailbackPlanEffects() {
+ Iterator<NodeFailbackPlan> iterator =
planId2FailbackPlanMap.values().iterator();
+ while (iterator.hasNext()) {
+ NodeFailbackPlan plan = iterator.next();
+ if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ //TODO if the failing back node is still active, notify it to
construct a new plan for it
+ iterator.remove();
+
+ //reassign the partitions that were supposed to be failed back
to an active replica
+ requestPartitionsTakeover(plan.getNodeId());
+ }
+ }
+ }
+
+ private synchronized void requestPartitionsTakeover(String failedNodeId) {
+ //replica -> list of partitions to takeover
+ Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+ ReplicationProperties replicationProperties =
AppContextInfo.INSTANCE.getReplicationProperties();
+
+ //collect the partitions of the failed NC
+ List<ClusterPartition> lostPartitions =
getNodeAssignedPartitions(failedNodeId);
+ if (!lostPartitions.isEmpty()) {
+ for (ClusterPartition partition : lostPartitions) {
+ //find replicas for this partitions
+ Set<String> partitionReplicas =
replicationProperties.getNodeReplicasIds(partition.getNodeId());
+ //find a replica that is still active
+ for (String replica : partitionReplicas) {
+ //TODO (mhubail) currently this assigns the partition to
the first found active replica.
+ //It needs to be modified to consider load balancing.
+ if (addActiveReplica(replica, partition,
partitionRecoveryPlan)) {
+ break;
+ }
+ }
+ }
+
+ if (partitionRecoveryPlan.size() == 0) {
+ //no active replicas were found for the failed node
+ LOGGER.severe("Could not find active replicas for the
partitions " + lostPartitions);
+ return;
+ } else {
+ LOGGER.info("Partitions to recover: " + lostPartitions);
+ }
+ //For each replica, send a request to takeover the assigned
partitions
+ for (Entry<String, List<Integer>> entry :
partitionRecoveryPlan.entrySet()) {
+ String replica = entry.getKey();
+ Integer[] partitionsToTakeover = entry.getValue().toArray(new
Integer[entry.getValue().size()]);
+ long requestId = clusterRequestId++;
+ TakeoverPartitionsRequestMessage takeoverRequest = new
TakeoverPartitionsRequestMessage(requestId,
+ replica, partitionsToTakeover);
+ pendingTakeoverRequests.put(requestId, takeoverRequest);
+ try {
+ messageBroker.sendApplicationMessageToNC(takeoverRequest,
replica);
+ } catch (Exception e) {
+ /**
+ * if we fail to send the request, it means the NC we
tried to send the request to
+ * has failed. When the failure notification arrives, we
will send any pending request
+ * that belongs to the failed NC to a different active
replica.
+ */
+ LOGGER.log(Level.WARNING, "Failed to send takeover
request: " + takeoverRequest, e);
+ }
+ }
+ }
+ }
+
+ private boolean addActiveReplica(String replica, ClusterPartition
partition,
+ Map<String, List<Integer>> partitionRecoveryPlan) {
+ Map<String, Map<String, String>> activeNcConfiguration =
clusterManager.getActiveNcConfiguration();
+ if (activeNcConfiguration.containsKey(replica) &&
!failedNodes.contains(replica)) {
+ if (!partitionRecoveryPlan.containsKey(replica)) {
+ List<Integer> replicaPartitions = new ArrayList<>();
+ replicaPartitions.add(partition.getPartitionId());
+ partitionRecoveryPlan.put(replica, replicaPartitions);
+ } else {
+
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private synchronized void prepareFailbackPlan(String failingBackNodeId) {
+ NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
+ pendingProcessingFailbackPlans.add(plan);
+ planId2FailbackPlanMap.put(plan.getPlanId(), plan);
+
+ //get all partitions this node requires to resync
+ ReplicationProperties replicationProperties =
AppContextInfo.INSTANCE.getReplicationProperties();
+ Set<String> nodeReplicas =
replicationProperties.getNodeReplicasIds(failingBackNodeId);
+ clusterManager.getClusterPartitons();
+ for (String replicaId : nodeReplicas) {
+ ClusterPartition[] nodePartitions =
clusterManager.getNodePartitions(replicaId);
+ for (ClusterPartition partition : nodePartitions) {
+ plan.addParticipant(partition.getActiveNodeId());
+ /**
+ * if the partition original node is the returning node,
+ * add it to the list of the partitions which will be failed
back
+ */
+ if (partition.getNodeId().equals(failingBackNodeId)) {
+ plan.addPartitionToFailback(partition.getPartitionId(),
partition.getActiveNodeId());
+ }
+ }
+ }
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Prepared Failback plan: " + plan.toString());
+ }
+
+ processPendingFailbackPlans();
+ }
+
+ private synchronized void processPendingFailbackPlans() {
+ ClusterState state = clusterManager.getState();
+ /**
+ * if the cluster state is not ACTIVE, then failbacks should not be
processed
+ * since some partitions are not active
+ */
+ if (state == ClusterState.ACTIVE) {
+ while (!pendingProcessingFailbackPlans.isEmpty()) {
+ //take the first pending failback plan
+ NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
+ /**
+ * A plan at this stage will be in one of two states:
+ * 1. PREPARING -> the participants were selected but we
haven't sent any request.
+ * 2. PENDING_ROLLBACK -> a participant failed before we send
any requests
+ */
+ if (plan.getState() == FailbackPlanState.PREPARING) {
+ //set the partitions that will be failed back as inactive
+ String failbackNode = plan.getNodeId();
+ for (Integer partitionId : plan.getPartitionsToFailback())
{
+ //partition expected to be returned to the failing
back node
+ clusterManager.updateClusterPartition(partitionId,
failbackNode, false);
+ }
+
+ /**
+ * if the returning node is the original metadata node,
+ * then metadata node will change after the failback
completes
+ */
+ String originalMetadataNode =
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
+ if (originalMetadataNode.equals(failbackNode)) {
+
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
+ currentMetadataNode = "";
+ metadataNodeActive = false;
+ clusterManager.updateMetadataNode(currentMetadataNode,
metadataNodeActive);
+ }
+
+ //force new jobs to wait
+ // TODO update state to rebalancing
+ clusterManager.forceIntoState(ClusterState.REBALANCING);
+ handleFailbackRequests(plan, messageBroker);
+ /**
+ * wait until the current plan is completed before
processing the next plan.
+ * when the current one completes or is reverted, the
cluster state will be
+ * ACTIVE again, and the next failback plan (if any) will
be processed.
+ */
+ break;
+ } else if (plan.getState() ==
FailbackPlanState.PENDING_ROLLBACK) {
+ //this plan failed before sending any requests -> nothing
to rollback
+ planId2FailbackPlanMap.remove(plan.getPlanId());
+ }
+ }
+ }
+ }
+
+ private void handleFailbackRequests(NodeFailbackPlan plan,
ICCMessageBroker messageBroker) {
+ //send requests to other nodes to complete on-going jobs and prepare
partitions for failback
+ for (PreparePartitionsFailbackRequestMessage request :
plan.getPlanFailbackRequests()) {
+ try {
+ messageBroker.sendApplicationMessageToNC(request,
request.getNodeID());
+ plan.addPendingRequest(request);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed to send failback request to:
" + request.getNodeID(), e);
+ plan.notifyNodeFailure(request.getNodeID());
+ revertFailedFailbackPlanEffects();
+ break;
+ }
+ }
+ }
+
+ public synchronized List<ClusterPartition>
getNodeAssignedPartitions(String nodeId) {
+ List<ClusterPartition> nodePartitions = new ArrayList<>();
+ ClusterPartition[] clusterPartitons =
clusterManager.getClusterPartitons();
+ Map<Integer, ClusterPartition> clusterPartitionsMap = new HashMap<>();
+ for (ClusterPartition partition : clusterPartitons) {
+ clusterPartitionsMap.put(partition.getPartitionId(), partition);
+ }
+ for (ClusterPartition partition : clusterPartitons) {
+ if (partition.getActiveNodeId().equals(nodeId)) {
+ nodePartitions.add(partition);
+ }
+ }
+ /**
+ * if there is any pending takeover request this node was supposed to
handle,
+ * it needs to be sent to a different replica
+ */
+ List<Long> failedTakeoverRequests = new ArrayList<>();
+ for (TakeoverPartitionsRequestMessage request :
pendingTakeoverRequests.values()) {
+ if (request.getNodeId().equals(nodeId)) {
+ for (Integer partitionId : request.getPartitions()) {
+ nodePartitions.add(clusterPartitionsMap.get(partitionId));
+ }
+ failedTakeoverRequests.add(request.getRequestId());
+ }
+ }
+
+ //remove failed requests
+ for (Long requestId : failedTakeoverRequests) {
+ pendingTakeoverRequests.remove(requestId);
+ }
+ return nodePartitions;
+ }
+
+ public synchronized void
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
+ throws HyracksDataException {
+ for (Integer partitonId : response.getPartitions()) {
+ clusterManager.updateClusterPartition(partitonId,
response.getNodeId(), true);
+ }
+ pendingTakeoverRequests.remove(response.getRequestId());
+ validateClusterState();
+ }
+
+ public synchronized void
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage
response)
+ throws HyracksDataException {
+ currentMetadataNode = response.getNodeId();
+ metadataNodeActive = true;
+ clusterManager.updateMetadataNode(currentMetadataNode,
metadataNodeActive);
+ validateClusterState();
+ }
+
+ private void validateClusterState() throws HyracksDataException {
+ clusterManager.updateClusterState();
+ ClusterState newState = clusterManager.getState();
+ // PENDING: all partitions are active not metadata node is not
+ if (newState == ClusterState.PENDING) {
+ requestMetadataNodeTakeover();
+ } else if (newState == ClusterState.ACTIVE) {
+ processPendingFailbackPlans();
+ }
+ }
+
+ public synchronized void
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
msg) {
+ NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
+ plan.markRequestCompleted(msg.getRequestId());
+ /**
+ * A plan at this stage will be in one of three states:
+ * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still
expected (wait).
+ * 2. PENDING_COMPLETION -> all responses received (time to send
completion request).
+ * 3. PENDING_ROLLBACK -> the plan failed and we just received the
final pending response (revert).
+ */
+ if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
+ CompleteFailbackRequestMessage request =
plan.getCompleteFailbackRequestMessage();
+
+ //send complete resync and takeover partitions to the failing back
node
+ try {
+ messageBroker.sendApplicationMessageToNC(request,
request.getNodeId());
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed to send complete failback
request to: " + request.getNodeId(), e);
+ notifyFailbackPlansNodeFailure(request.getNodeId());
+ revertFailedFailbackPlanEffects();
+ }
+ } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
+ revertFailedFailbackPlanEffects();
+ }
+ }
+
+ public synchronized void
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
+ throws HyracksDataException {
+ /**
+ * the failback plan completed successfully:
+ * Remove all references to it.
+ * Remove the the failing back node from the failed nodes list.
+ * Notify its replicas to reconnect to it.
+ * Set the failing back node partitions as active.
+ */
+ NodeFailbackPlan plan =
planId2FailbackPlanMap.remove(response.getPlanId());
+ String nodeId = plan.getNodeId();
+ failedNodes.remove(nodeId);
+ //notify impacted replicas they can reconnect to this node
+ notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
+ clusterManager.updateNodePartitions(nodeId, true);
+ validateClusterState();
+ }
+
+ private synchronized void requestMetadataNodeTakeover() {
+ //need a new node to takeover metadata node
+ ClusterPartition metadataPartiton =
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataPartition();
+ //request the metadataPartition node to register itself as the
metadata node
+ TakeoverMetadataNodeRequestMessage takeoverRequest = new
TakeoverMetadataNodeRequestMessage();
+ try {
+ messageBroker.sendApplicationMessageToNC(takeoverRequest,
metadataPartiton.getActiveNodeId());
+ } catch (Exception e) {
+ /**
+ * if we fail to send the request, it means the NC we tried to
send the request to
+ * has failed. When the failure notification arrives, a new NC
will be assigned to
+ * the metadata partition and a new metadata node takeover request
will be sent to it.
+ */
+ LOGGER.log(Level.WARNING,
+ "Failed to send metadata node takeover request to: " +
metadataPartiton.getActiveNodeId(), e);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
index 3ba1965..4d8006c 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java
@@ -19,13 +19,11 @@
package org.apache.asterix.runtime.util;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.logging.Level;
@@ -33,24 +31,19 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.runtime.message.CompleteFailbackRequestMessage;
import org.apache.asterix.runtime.message.CompleteFailbackResponseMessage;
-import org.apache.asterix.runtime.message.NodeFailbackPlan;
-import org.apache.asterix.runtime.message.NodeFailbackPlan.FailbackPlanState;
-import
org.apache.asterix.runtime.message.PreparePartitionsFailbackRequestMessage;
import
org.apache.asterix.runtime.message.PreparePartitionsFailbackResponseMessage;
-import org.apache.asterix.runtime.message.ReplicaEventMessage;
-import org.apache.asterix.runtime.message.TakeoverMetadataNodeRequestMessage;
import org.apache.asterix.runtime.message.TakeoverMetadataNodeResponseMessage;
-import org.apache.asterix.runtime.message.TakeoverPartitionsRequestMessage;
import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.runtime.replication.AutoFaultToleranceStrategy;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.json.JSONException;
@@ -60,7 +53,7 @@
* A holder class for properties related to the Asterix cluster.
*/
-public class ClusterStateManager {
+public class ClusterStateManager implements IClusterStateManager {
/*
* TODO: currently after instance restarts we require all nodes to join
again,
* otherwise the cluster wont be ACTIVE. we may overcome this by storing
the cluster state before the instance
@@ -69,9 +62,9 @@
private static final Logger LOGGER =
Logger.getLogger(ClusterStateManager.class.getName());
public static final ClusterStateManager INSTANCE = new
ClusterStateManager();
- private static final String CLUSTER_NET_IP_ADDRESS_KEY =
"cluster-net-ip-address";
private static final String IO_DEVICES = "iodevices";
- private Map<String, Map<String, String>> activeNcConfiguration = new
HashMap<>();
+ private final Map<String, Map<String, String>> activeNcConfiguration = new
HashMap<>();
+ private Map<String, Map<String, String>> activeNcConfigurationBackup = new
HashMap<>();
private final Cluster cluster;
private ClusterState state = ClusterState.UNUSABLE;
@@ -82,32 +75,25 @@
private Map<String, ClusterPartition[]> node2PartitionsMap = null;
private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
- private Map<Long, TakeoverPartitionsRequestMessage>
pendingTakeoverRequests = null;
- private long clusterRequestId = 0;
private String currentMetadataNode = null;
private boolean metadataNodeActive = false;
- private boolean autoFailover = false;
- private boolean replicationEnabled = false;
private Set<String> failedNodes = new HashSet<>();
- private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
- private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+ private IReplicationStrategy replicationStrategy;
+ private ICCMessageBroker messageBroker;
+ private IFaultToleranceStrategy ftStrategy;
private ClusterStateManager() {
cluster = ClusterProperties.INSTANCE.getCluster();
// if this is the CC process
- if (AppContextInfo.INSTANCE.initialized()
- && AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
+ if (AppContextInfo.INSTANCE.initialized() &&
AppContextInfo.INSTANCE.getCCApplicationContext() != null) {
node2PartitionsMap =
AppContextInfo.INSTANCE.getMetadataProperties().getNodePartitions();
clusterPartitions =
AppContextInfo.INSTANCE.getMetadataProperties().getClusterPartitions();
currentMetadataNode =
AppContextInfo.INSTANCE.getMetadataProperties().getMetadataNodeName();
- replicationEnabled =
ClusterProperties.INSTANCE.isReplicationEnabled();
- autoFailover = ClusterProperties.INSTANCE.isAutoFailoverEnabled();
- if (autoFailover) {
- pendingTakeoverRequests = new HashMap<>();
- pendingProcessingFailbackPlans = new LinkedList<>();
- planId2FailbackPlanMap = new HashMap<>();
- }
+ replicationStrategy =
AppContextInfo.INSTANCE.getReplicationProperties().getReplicationStrategy();
+ messageBroker = (ICCMessageBroker)
AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
+ // TODO get from config or factory
+ ftStrategy = new AutoFaultToleranceStrategy(replicationStrategy,
this, messageBroker);
}
}
@@ -115,30 +101,8 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " +
nodeId);
}
- activeNcConfiguration.remove(nodeId);
-
- //if this node was waiting for failback and failed before it completed
- if (failedNodes.contains(nodeId)) {
- if (autoFailover) {
- notifyFailbackPlansNodeFailure(nodeId);
- revertFailedFailbackPlanEffects();
- }
- } else {
- //an active node failed
- failedNodes.add(nodeId);
- if (nodeId.equals(currentMetadataNode)) {
- metadataNodeActive = false;
- LOGGER.info("Metadata node is now inactive");
- }
- updateNodePartitions(nodeId, false);
- if (replicationEnabled) {
- notifyImpactedReplicas(nodeId, ClusterEventType.NODE_FAILURE);
- if (autoFailover) {
- notifyFailbackPlansNodeFailure(nodeId);
- requestPartitionsTakeover(nodeId);
- }
- }
- }
+ activeNcConfigurationBackup.put(nodeId,
activeNcConfiguration.remove(nodeId));
+ ftStrategy.notifyNodeFailure(nodeId);
}
public synchronized void addNCConfiguration(String nodeId, Map<String,
String> configuration)
@@ -146,47 +110,53 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering configuration parameters for node id " +
nodeId);
}
+ if (configuration == null) {
+ configuration = activeNcConfigurationBackup.get(nodeId);
+ }
activeNcConfiguration.put(nodeId, configuration);
-
- //a node trying to come back after failure
- if (failedNodes.contains(nodeId)) {
- if (autoFailover) {
- prepareFailbackPlan(nodeId);
- return;
- } else {
- //a node completed local or remote recovery and rejoined
- failedNodes.remove(nodeId);
- if (replicationEnabled) {
- //notify other replica to reconnect to this node
- notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
- }
- }
- }
-
- if (nodeId.equals(currentMetadataNode)) {
- metadataNodeActive = true;
- LOGGER.info("Metadata node is now active");
- }
- updateNodePartitions(nodeId, true);
+ ftStrategy.notifyNodeJoin(nodeId);
}
- private synchronized void updateNodePartitions(String nodeId, boolean
added) throws HyracksDataException {
+ @Override
+ public synchronized void forceIntoState(ClusterState state) {
+ this.state = state;
+ }
+
+ @Override
+ public void updateMetadataNode(String nodeId, boolean active) {
+ currentMetadataNode = nodeId;
+ metadataNodeActive = active;
+ if (active) {
+ LOGGER.info(String.format("Metadata node %s is now active",
currentMetadataNode));
+ }
+ }
+
+ @Override
+ public synchronized void updateNodePartitions(String nodeId, boolean
active) throws HyracksDataException {
ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
// if this isn't a storage node, it will not have cluster partitions
if (nodePartitions != null) {
for (ClusterPartition p : nodePartitions) {
- // set the active node for this node's partitions
- p.setActive(added);
- if (added) {
- p.setActiveNodeId(nodeId);
- }
+ updateClusterPartition(p.getPartitionId(), nodeId, active);
}
- resetClusterPartitionConstraint();
- updateClusterState();
}
}
- private synchronized void updateClusterState() throws HyracksDataException
{
+ @Override
+ public synchronized void updateClusterPartition(Integer partitionNum,
String activeNode, boolean active) {
+ ClusterPartition clusterPartition =
clusterPartitions.get(partitionNum);
+ if (clusterPartition != null) {
+ // set the active node for this node's partitions
+ clusterPartition.setActive(active);
+ if (active) {
+ clusterPartition.setActiveNodeId(activeNode);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void updateClusterState() throws HyracksDataException {
+ resetClusterPartitionConstraint();
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
state = ClusterState.UNUSABLE;
@@ -194,20 +164,17 @@
return;
}
}
+
+ state = ClusterState.PENDING;
+ LOGGER.info("Cluster is now " + state);
+
// if all storage partitions are active as well as the metadata node,
then the cluster is active
if (metadataNodeActive) {
- state = ClusterState.PENDING;
- LOGGER.info("Cluster is now " + state);
AppContextInfo.INSTANCE.getMetadataBootstrap().init();
state = ClusterState.ACTIVE;
LOGGER.info("Cluster is now " + state);
// start global recovery
AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery();
- if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) {
- processPendingFailbackPlans();
- }
- } else {
- requestMetadataNodeTakeover();
}
}
@@ -230,6 +197,7 @@
return ncConfig.get(IO_DEVICES).split(",");
}
+ @Override
public ClusterState getState() {
return state;
}
@@ -285,6 +253,7 @@
return
AppContextInfo.INSTANCE.getMetadataProperties().getNodeNames().size();
}
+ @Override
public synchronized ClusterPartition[] getNodePartitions(String nodeId) {
return node2PartitionsMap.get(nodeId);
}
@@ -296,337 +265,13 @@
return 0;
}
+ @Override
public synchronized ClusterPartition[] getClusterPartitons() {
ArrayList<ClusterPartition> partitons = new ArrayList<>();
for (ClusterPartition partition : clusterPartitions.values()) {
partitons.add(partition);
}
return partitons.toArray(new ClusterPartition[] {});
- }
-
- private synchronized void requestPartitionsTakeover(String failedNodeId) {
- //replica -> list of partitions to takeover
- Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
- ReplicationProperties replicationProperties =
AppContextInfo.INSTANCE.getReplicationProperties();
-
- //collect the partitions of the failed NC
- List<ClusterPartition> lostPartitions =
getNodeAssignedPartitions(failedNodeId);
- if (!lostPartitions.isEmpty()) {
- for (ClusterPartition partition : lostPartitions) {
- //find replicas for this partitions
- Set<String> partitionReplicas =
replicationProperties.getNodeReplicasIds(partition.getNodeId());
- //find a replica that is still active
- for (String replica : partitionReplicas) {
- //TODO (mhubail) currently this assigns the partition to
the first found active replica.
- //It needs to be modified to consider load balancing.
- if (addActiveReplica(replica, partition,
partitionRecoveryPlan)) {
- break;
- }
- }
- }
-
- if (partitionRecoveryPlan.size() == 0) {
- //no active replicas were found for the failed node
- LOGGER.severe("Could not find active replicas for the
partitions " + lostPartitions);
- return;
- } else {
- LOGGER.info("Partitions to recover: " + lostPartitions);
- }
- ICCMessageBroker messageBroker = (ICCMessageBroker)
AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- //For each replica, send a request to takeover the assigned
partitions
- for (Entry<String, List<Integer>> entry :
partitionRecoveryPlan.entrySet()) {
- String replica = entry.getKey();
- Integer[] partitionsToTakeover = entry.getValue().toArray(new
Integer[entry.getValue().size()]);
- long requestId = clusterRequestId++;
- TakeoverPartitionsRequestMessage takeoverRequest = new
TakeoverPartitionsRequestMessage(requestId,
- replica, partitionsToTakeover);
- pendingTakeoverRequests.put(requestId, takeoverRequest);
- try {
- messageBroker.sendApplicationMessageToNC(takeoverRequest,
replica);
- } catch (Exception e) {
- /**
- * if we fail to send the request, it means the NC we
tried to send the request to
- * has failed. When the failure notification arrives, we
will send any pending request
- * that belongs to the failed NC to a different active
replica.
- */
- LOGGER.log(Level.WARNING, "Failed to send takeover
request: " + takeoverRequest, e);
- }
- }
- }
- }
-
- private boolean addActiveReplica(String replica, ClusterPartition
partition,
- Map<String, List<Integer>> partitionRecoveryPlan) {
- if (activeNcConfiguration.containsKey(replica) &&
!failedNodes.contains(replica)) {
- if (!partitionRecoveryPlan.containsKey(replica)) {
- List<Integer> replicaPartitions = new ArrayList<>();
- replicaPartitions.add(partition.getPartitionId());
- partitionRecoveryPlan.put(replica, replicaPartitions);
- } else {
-
partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
- }
- return true;
- }
- return false;
- }
-
- private synchronized List<ClusterPartition>
getNodeAssignedPartitions(String nodeId) {
- List<ClusterPartition> nodePartitions = new ArrayList<>();
- for (ClusterPartition partition : clusterPartitions.values()) {
- if (partition.getActiveNodeId().equals(nodeId)) {
- nodePartitions.add(partition);
- }
- }
- /**
- * if there is any pending takeover request that this node was
supposed to handle,
- * it needs to be sent to a different replica
- */
- List<Long> failedTakeoverRequests = new ArrayList<>();
- for (TakeoverPartitionsRequestMessage request :
pendingTakeoverRequests.values()) {
- if (request.getNodeId().equals(nodeId)) {
- for (Integer partitionId : request.getPartitions()) {
- nodePartitions.add(clusterPartitions.get(partitionId));
- }
- failedTakeoverRequests.add(request.getRequestId());
- }
- }
-
- //remove failed requests
- for (Long requestId : failedTakeoverRequests) {
- pendingTakeoverRequests.remove(requestId);
- }
- return nodePartitions;
- }
-
- private synchronized void requestMetadataNodeTakeover() {
- //need a new node to takeover metadata node
- ClusterPartition metadataPartiton =
AppContextInfo.INSTANCE.getMetadataProperties()
- .getMetadataPartition();
- //request the metadataPartition node to register itself as the
metadata node
- TakeoverMetadataNodeRequestMessage takeoverRequest = new
TakeoverMetadataNodeRequestMessage();
- ICCMessageBroker messageBroker = (ICCMessageBroker)
AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- try {
- messageBroker.sendApplicationMessageToNC(takeoverRequest,
metadataPartiton.getActiveNodeId());
- } catch (Exception e) {
- /**
- * if we fail to send the request, it means the NC we tried to
send the request to
- * has failed. When the failure notification arrives, a new NC
will be assigned to
- * the metadata partition and a new metadata node takeover request
will be sent to it.
- */
- LOGGER.log(Level.WARNING,
- "Failed to send metadata node takeover request to: " +
metadataPartiton.getActiveNodeId(), e);
- }
- }
-
- public synchronized void
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response)
- throws HyracksDataException {
- for (Integer partitonId : response.getPartitions()) {
- ClusterPartition partition = clusterPartitions.get(partitonId);
- partition.setActive(true);
- partition.setActiveNodeId(response.getNodeId());
- }
- pendingTakeoverRequests.remove(response.getRequestId());
- resetClusterPartitionConstraint();
- updateClusterState();
- }
-
- public synchronized void
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage
response)
- throws HyracksDataException {
- currentMetadataNode = response.getNodeId();
- metadataNodeActive = true;
- LOGGER.info("Current metadata node: " + currentMetadataNode);
- updateClusterState();
- }
-
- private synchronized void prepareFailbackPlan(String failingBackNodeId) {
- NodeFailbackPlan plan = NodeFailbackPlan.createPlan(failingBackNodeId);
- pendingProcessingFailbackPlans.add(plan);
- planId2FailbackPlanMap.put(plan.getPlanId(), plan);
-
- //get all partitions this node requires to resync
- ReplicationProperties replicationProperties =
AppContextInfo.INSTANCE.getReplicationProperties();
- Set<String> nodeReplicas =
replicationProperties.getNodeReplicationClients(failingBackNodeId);
- for (String replicaId : nodeReplicas) {
- ClusterPartition[] nodePartitions =
node2PartitionsMap.get(replicaId);
- for (ClusterPartition partition : nodePartitions) {
- plan.addParticipant(partition.getActiveNodeId());
- /**
- * if the partition original node is the returning node,
- * add it to the list of the partitions which will be failed
back
- */
- if (partition.getNodeId().equals(failingBackNodeId)) {
- plan.addPartitionToFailback(partition.getPartitionId(),
partition.getActiveNodeId());
- }
- }
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Prepared Failback plan: " + plan.toString());
- }
-
- processPendingFailbackPlans();
- }
-
- private synchronized void processPendingFailbackPlans() {
- /**
- * if the cluster state is not ACTIVE, then failbacks should not be
processed
- * since some partitions are not active
- */
- if (state == ClusterState.ACTIVE) {
- while (!pendingProcessingFailbackPlans.isEmpty()) {
- //take the first pending failback plan
- NodeFailbackPlan plan = pendingProcessingFailbackPlans.pop();
- /**
- * A plan at this stage will be in one of two states:
- * 1. PREPARING -> the participants were selected but we
haven't sent any request.
- * 2. PENDING_ROLLBACK -> a participant failed before we send
any requests
- */
- if (plan.getState() == FailbackPlanState.PREPARING) {
- //set the partitions that will be failed back as inactive
- String failbackNode = plan.getNodeId();
- for (Integer partitionId : plan.getPartitionsToFailback())
{
- ClusterPartition clusterPartition =
clusterPartitions.get(partitionId);
- clusterPartition.setActive(false);
- //partition expected to be returned to the failing
back node
- clusterPartition.setActiveNodeId(failbackNode);
- }
-
- /**
- * if the returning node is the original metadata node,
- * then metadata node will change after the failback
completes
- */
- String originalMetadataNode =
AppContextInfo.INSTANCE.getMetadataProperties()
- .getMetadataNodeName();
- if (originalMetadataNode.equals(failbackNode)) {
-
plan.setNodeToReleaseMetadataManager(currentMetadataNode);
- currentMetadataNode = "";
- metadataNodeActive = false;
- }
-
- //force new jobs to wait
- state = ClusterState.REBALANCING;
- ICCMessageBroker messageBroker = (ICCMessageBroker)
AppContextInfo.INSTANCE
- .getCCApplicationContext().getMessageBroker();
- handleFailbackRequests(plan, messageBroker);
- /**
- * wait until the current plan is completed before
processing the next plan.
- * when the current one completes or is reverted, the
cluster state will be
- * ACTIVE again, and the next failback plan (if any) will
be processed.
- */
- break;
- } else if (plan.getState() ==
FailbackPlanState.PENDING_ROLLBACK) {
- //this plan failed before sending any requests -> nothing
to rollback
- planId2FailbackPlanMap.remove(plan.getPlanId());
- }
- }
- }
- }
-
- private void handleFailbackRequests(NodeFailbackPlan plan,
ICCMessageBroker messageBroker) {
- //send requests to other nodes to complete on-going jobs and prepare
partitions for failback
- for (PreparePartitionsFailbackRequestMessage request :
plan.getPlanFailbackRequests()) {
- try {
- messageBroker.sendApplicationMessageToNC(request,
request.getNodeID());
- plan.addPendingRequest(request);
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failed to send failback request to:
" + request.getNodeID(), e);
- plan.notifyNodeFailure(request.getNodeID());
- revertFailedFailbackPlanEffects();
- break;
- }
- }
- }
-
- public synchronized void
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
msg) {
- NodeFailbackPlan plan = planId2FailbackPlanMap.get(msg.getPlanId());
- plan.markRequestCompleted(msg.getRequestId());
- /**
- * A plan at this stage will be in one of three states:
- * 1. PENDING_PARTICIPANT_REPONSE -> one or more responses are still
expected (wait).
- * 2. PENDING_COMPLETION -> all responses received (time to send
completion request).
- * 3. PENDING_ROLLBACK -> the plan failed and we just received the
final pending response (revert).
- */
- if (plan.getState() == FailbackPlanState.PENDING_COMPLETION) {
- CompleteFailbackRequestMessage request =
plan.getCompleteFailbackRequestMessage();
-
- //send complete resync and takeover partitions to the failing back
node
- ICCMessageBroker messageBroker = (ICCMessageBroker)
AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- try {
- messageBroker.sendApplicationMessageToNC(request,
request.getNodeId());
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failed to send complete failback
request to: " + request.getNodeId(), e);
- notifyFailbackPlansNodeFailure(request.getNodeId());
- revertFailedFailbackPlanEffects();
- }
- } else if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
- revertFailedFailbackPlanEffects();
- }
- }
-
- public synchronized void
processCompleteFailbackResponse(CompleteFailbackResponseMessage response)
- throws HyracksDataException {
- /**
- * the failback plan completed successfully:
- * Remove all references to it.
- * Remove the the failing back node from the failed nodes list.
- * Notify its replicas to reconnect to it.
- * Set the failing back node partitions as active.
- */
- NodeFailbackPlan plan =
planId2FailbackPlanMap.remove(response.getPlanId());
- String nodeId = plan.getNodeId();
- failedNodes.remove(nodeId);
- //notify impacted replicas they can reconnect to this node
- notifyImpactedReplicas(nodeId, ClusterEventType.NODE_JOIN);
- updateNodePartitions(nodeId, true);
- }
-
- private synchronized void notifyImpactedReplicas(String nodeId,
ClusterEventType event) {
- ReplicationProperties replicationProperties =
AppContextInfo.INSTANCE.getReplicationProperties();
- Set<String> remoteReplicas =
replicationProperties.getRemoteReplicasIds(nodeId);
- String nodeIdAddress = "";
- //in case the node joined with a new IP address, we need to send it to
the other replicas
- if (event == ClusterEventType.NODE_JOIN) {
- nodeIdAddress =
activeNcConfiguration.get(nodeId).get(CLUSTER_NET_IP_ADDRESS_KEY);
- }
-
- ReplicaEventMessage msg = new ReplicaEventMessage(nodeId,
nodeIdAddress, event);
- ICCMessageBroker messageBroker = (ICCMessageBroker)
AppContextInfo.INSTANCE.getCCApplicationContext()
- .getMessageBroker();
- for (String replica : remoteReplicas) {
- //if the remote replica is alive, send the event
- if (activeNcConfiguration.containsKey(replica)) {
- try {
- messageBroker.sendApplicationMessageToNC(msg, replica);
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Failed sending an application
message to an NC", e);
- }
- }
- }
- }
-
- private synchronized void revertFailedFailbackPlanEffects() {
- Iterator<NodeFailbackPlan> iterator =
planId2FailbackPlanMap.values().iterator();
- while (iterator.hasNext()) {
- NodeFailbackPlan plan = iterator.next();
- if (plan.getState() == FailbackPlanState.PENDING_ROLLBACK) {
- //TODO if the failing back node is still active, notify it to
construct a new plan for it
- iterator.remove();
-
- //reassign the partitions that were supposed to be failed back
to an active replica
- requestPartitionsTakeover(plan.getNodeId());
- }
- }
- }
-
- private synchronized void notifyFailbackPlansNodeFailure(String nodeId) {
- Iterator<NodeFailbackPlan> iterator =
planId2FailbackPlanMap.values().iterator();
- while (iterator.hasNext()) {
- NodeFailbackPlan plan = iterator.next();
- plan.notifyNodeFailure(nodeId);
- }
}
public synchronized boolean isMetadataNodeActive() {
@@ -654,9 +299,7 @@
}
}
nodeJSON.put("state", failedNodes.contains(entry.getKey()) ?
"FAILED"
- : allActive ? "ACTIVE"
- : anyActive ? "PARTIALLY_ACTIVE"
- : "INACTIVE");
+ : allActive ? "ACTIVE" : anyActive ? "PARTIALLY_ACTIVE" :
"INACTIVE");
nodeJSON.put("partitions", partitions);
stateDescription.accumulate("ncs", nodeJSON);
}
@@ -670,4 +313,35 @@
stateDescription.put("partitions", clusterPartitions);
return stateDescription;
}
+
+ @Override
+ public Map<String, Map<String, String>> getActiveNcConfiguration() {
+ return Collections.unmodifiableMap(activeNcConfiguration);
+ }
+
+ @Override
+ public String getCurrentMetadataNodeId() {
+ return currentMetadataNode;
+ }
+
+ //TODO messaging should be handled by FaultToleranceManager
+ public synchronized void
processCompleteFailbackResponse(CompleteFailbackResponseMessage msg)
+ throws HyracksDataException {
+ ((AutoFaultToleranceStrategy)
ftStrategy).processCompleteFailbackResponse(msg);
+ }
+
+ public synchronized void
processPreparePartitionsFailbackResponse(PreparePartitionsFailbackResponseMessage
msg) {
+ ((AutoFaultToleranceStrategy)
ftStrategy).processPreparePartitionsFailbackResponse(msg);
+
+ }
+
+ public synchronized void
processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage msg)
+ throws HyracksDataException {
+ ((AutoFaultToleranceStrategy)
ftStrategy).processMetadataNodeTakeoverResponse(msg);
+ }
+
+ public void
processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage msg) throws
HyracksDataException {
+ ((AutoFaultToleranceStrategy)
ftStrategy).processPartitionTakeoverResponse(msg);
+ }
+
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index c81225f..03a3364 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
@@ -30,15 +31,16 @@
public class LogManagerWithReplication extends LogManager {
private IReplicationManager replicationManager;
+ private final IReplicationStrategy replicationStrategy;
- public LogManagerWithReplication(TransactionSubsystem txnSubsystem) {
+ public LogManagerWithReplication(TransactionSubsystem txnSubsystem,
IReplicationStrategy replicationPolicy) {
super(txnSubsystem);
+ this.replicationStrategy = replicationPolicy;
}
@Override
public void log(ILogRecord logRecord) throws ACIDException {
- //only locally generated logs should be replicated
- logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL &&
logRecord.getLogType() != LogType.WAIT);
+ logRecord.setReplicated(replicationStrategy.isMatch(logRecord));
//Remote flush logs do not need to be flushed separately since they
may not trigger local flush
if (logRecord.getLogType() == LogType.FLUSH &&
logRecord.getLogSource() == LogSource.LOCAL) {
@@ -74,7 +76,8 @@
}
//wait for job Commit/Abort ACK from replicas
- if (logRecord.getLogType() == LogType.JOB_COMMIT ||
logRecord.getLogType() == LogType.ABORT) {
+ if (logRecord.isReplicated() && (logRecord.getLogType() ==
LogType.JOB_COMMIT
+ || logRecord.getLogType() == LogType.ABORT)) {
while
(!replicationManager.hasBeenReplicated(logRecord)) {
try {
logRecord.wait();
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 09183fe..a3060e0 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,11 +22,11 @@
import java.util.concurrent.Future;
import java.util.logging.Logger;
-import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.IPropertiesProvider;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -70,7 +70,11 @@
this.txnProperties = txnProperties;
this.transactionManager = new TransactionManager(this);
this.lockManager = new
ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
- final boolean replicationEnabled =
ClusterProperties.INSTANCE.isReplicationEnabled();
+ ReplicationProperties asterixReplicationProperties =
((IPropertiesProvider) asterixAppRuntimeContextProvider
+ .getAppContext()).getReplicationProperties();
+ IReplicationStrategy replicationStrategy =
asterixReplicationProperties.getReplicationStrategy();
+ final boolean replicationEnabled =
asterixReplicationProperties.getReplicationStrategy().isParticipant(id);
+
final CheckpointProperties checkpointProperties = new
CheckpointProperties(txnProperties, id);
checkpointManager = CheckpointManagerFactory.create(this,
checkpointProperties, replicationEnabled);
final Checkpoint latestCheckpoint = checkpointManager.getLatest();
@@ -80,14 +84,8 @@
latestCheckpoint.getStorageVersion(),
StorageConstants.VERSION));
}
- ReplicationProperties asterixReplicationProperties = null;
- if (asterixAppRuntimeContextProvider != null) {
- asterixReplicationProperties = ((IPropertiesProvider)
asterixAppRuntimeContextProvider
- .getAppContext()).getReplicationProperties();
- }
-
- if (asterixReplicationProperties != null && replicationEnabled) {
- this.logManager = new LogManagerWithReplication(this);
+ if (replicationEnabled) {
+ this.logManager = new LogManagerWithReplication(this,
replicationStrategy);
} else {
this.logManager = new LogManager(this);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1405
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I1d1012f5541ce786f127866efefb9f3db434fedd
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>