Repository: asterixdb Updated Branches: refs/heads/master c47773778 -> f12288854
[ASTERIXDB-2228][CLUS] IFaultToleranceStrategy -> INcLifecycleCoordinator - user model changes: no - storage format changes: no - interface changes: yes - Rename IFaultToleranceStrategy -> INcLifecycleCoordinator Details: - Rename IFaultToleranceStrategy since it doesn't provide fault tolerance anymore but rather coordinates node lifecycle. - Make INcLifecycleCoordinator extensible. - Remove unneeded FaultToleranceStrategyFactory. Change-Id: I51a2d6f5402630683f709806e6c01f0c7e83914f Reviewed-on: https://asterix-gerrit.ics.uci.edu/2266 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/f1228885 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/f1228885 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/f1228885 Branch: refs/heads/master Commit: f1228885446eed8a49db1a674538376f9bb3f62c Parents: c477737 Author: Murtadha Hubail <mhub...@apache.org> Authored: Sun Jan 7 12:31:01 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Sun Jan 7 11:53:29 2018 -0800 ---------------------------------------------------------------------- .../api/http/server/ClusterApiServlet.java | 2 +- .../FaultToleranceStrategyFactory.java | 38 ---- .../app/replication/NcLifecycleCoordinator.java | 218 ++++++++++++++++++ .../replication/NoFaultToleranceStrategy.java | 221 ------------------- .../message/MetadataNodeResponseMessage.java | 2 +- .../message/NCLifecycleTaskReportMessage.java | 2 +- .../RegistrationTasksRequestMessage.java | 2 +- .../hyracks/bootstrap/CCApplication.java | 19 +- .../runtime/ClusterStateManagerTest.java | 10 +- .../common/dataflow/ICcApplicationContext.java | 6 +- .../replication/IFaultToleranceStrategy.java | 75 ------- .../replication/INcLifecycleCoordinator.java | 63 ++++++ .../runtime/utils/CcApplicationContext.java | 8 +- .../runtime/utils/ClusterStateManager.java | 18 +- 14 files changed, 313 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java index 38ee10c..d239038 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java @@ -169,7 +169,7 @@ public class ClusterApiServlet extends AbstractServlet { private void processMetadataNode(IServletRequest request, IServletResponse response) throws HyracksDataException { final String node = request.getParameter("node"); - appCtx.getFaultToleranceStrategy().notifyMetadataNodeChange(node); + appCtx.getNcLifecycleCoordinator().notifyMetadataNodeChange(node); response.setStatus(HttpResponseStatus.OK); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java deleted file mode 100644 index 5a715d6..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.replication; - -import org.apache.asterix.common.replication.IFaultToleranceStrategy; -import org.apache.hyracks.api.application.ICCServiceContext; - -public class FaultToleranceStrategyFactory { - - private FaultToleranceStrategyFactory() { - throw new AssertionError(); - } - - public static IFaultToleranceStrategy create(ICCServiceContext serviceCtx, boolean replicationEnabled) { - Class<? extends IFaultToleranceStrategy> clazz = NoFaultToleranceStrategy.class; - try { - return clazz.newInstance().from(serviceCtx, replicationEnabled); - } catch (InstantiationException | IllegalAccessException e) { - throw new IllegalStateException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java new file mode 100644 index 0000000..54b1a01 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.replication; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.asterix.app.nc.task.BindMetadataNodeTask; +import org.apache.asterix.app.nc.task.CheckpointTask; +import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; +import org.apache.asterix.app.nc.task.LocalRecoveryTask; +import org.apache.asterix.app.nc.task.MetadataBootstrapTask; +import org.apache.asterix.app.nc.task.ReportLocalCountersTask; +import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; +import org.apache.asterix.app.nc.task.StartReplicationServiceTask; +import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage; +import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage; +import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.common.replication.INCLifecycleMessage; +import org.apache.asterix.common.replication.INcLifecycleCoordinator; +import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.client.NodeStatus; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class NcLifecycleCoordinator implements INcLifecycleCoordinator { + + private static final Logger LOGGER = LogManager.getLogger(); + protected IClusterStateManager clusterManager; + protected String metadataNodeId; + protected Set<String> pendingStartupCompletionNodes = new HashSet<>(); + protected final ICCMessageBroker messageBroker; + private final boolean replicationEnabled; + + public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) { + this.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); + this.replicationEnabled = replicationEnabled; + } + + @Override + public void notifyNodeJoin(String nodeId) { + pendingStartupCompletionNodes.add(nodeId); + } + + @Override + public void notifyNodeFailure(String nodeId) throws HyracksDataException { + pendingStartupCompletionNodes.remove(nodeId); + clusterManager.updateNodePartitions(nodeId, false); + if (nodeId.equals(metadataNodeId)) { + clusterManager.updateMetadataNode(metadataNodeId, false); + } + clusterManager.refreshState(); + } + + @Override + public void process(INCLifecycleMessage message) throws HyracksDataException { + switch (message.getType()) { + case REGISTRATION_TASKS_REQUEST: + process((RegistrationTasksRequestMessage) message); + break; + case REGISTRATION_TASKS_RESULT: + process((NCLifecycleTaskReportMessage) message); + break; + case METADATA_NODE_RESPONSE: + process((MetadataNodeResponseMessage) message); + break; + default: + throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name()); + } + } + + @Override + public void bindTo(IClusterStateManager clusterManager) { + this.clusterManager = clusterManager; + metadataNodeId = clusterManager.getCurrentMetadataNodeId(); + } + + private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { + final String nodeId = msg.getNodeId(); + List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState()); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); + try { + messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { + pendingStartupCompletionNodes.remove(msg.getNodeId()); + if (msg.isSuccess()) { + clusterManager.updateNodePartitions(msg.getNodeId(), true); + if (msg.getNodeId().equals(metadataNodeId)) { + clusterManager.updateMetadataNode(metadataNodeId, true); + } + clusterManager.refreshState(); + } else { + if (LOGGER.isErrorEnabled()) { + LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete startup. ", msg.getException()); + } + } + } + + protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) { + LOGGER.log(Level.INFO, () -> "Building registration tasks for node: " + nodeId + " with state: " + state); + final boolean isMetadataNode = nodeId.equals(metadataNodeId); + if (nodeStatus == NodeStatus.ACTIVE) { + /* + * if the node state is already ACTIVE then it completed + * booting and just re-registering with a new/failed CC. + */ + return buildActiveNCRegTasks(isMetadataNode); + } + final List<INCLifecycleTask> tasks = new ArrayList<>(); + if (state == SystemState.CORRUPTED) { + //need to perform local recovery for node partitions + LocalRecoveryTask rt = new LocalRecoveryTask( + Arrays.asList(clusterManager.getNodePartitions(nodeId)).stream() + .map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); + tasks.add(rt); + } + if (replicationEnabled) { + tasks.add(new StartReplicationServiceTask()); + } + if (isMetadataNode) { + tasks.add(new MetadataBootstrapTask()); + } + tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); + tasks.add(new ReportLocalCountersTask()); + tasks.add(new CheckpointTask()); + tasks.add(new StartLifecycleComponentsTask()); + if (isMetadataNode) { + tasks.add(new BindMetadataNodeTask(true)); + } + return tasks; + } + + protected List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + if (metadataNode) { + // need to unbind from old distributed state then rebind to new one + tasks.add(new BindMetadataNodeTask(false)); + tasks.add(new BindMetadataNodeTask(true)); + } + tasks.add(new ReportLocalCountersTask()); + return tasks; + } + + @Override + public void notifyMetadataNodeChange(String node) throws HyracksDataException { + if (metadataNodeId.equals(node)) { + return; + } + // if current metadata node is active, we need to unbind its metadata proxy object + if (clusterManager.isMetadataNodeActive()) { + MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false); + try { + messageBroker.sendApplicationMessageToNC(msg, metadataNodeId); + // when the current node responses, we will bind to the new one + metadataNodeId = node; + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } else { + requestMetadataNodeTakeover(node); + } + } + + private void process(MetadataNodeResponseMessage response) throws HyracksDataException { + // rebind metadata node since it might be changing + MetadataManager.INSTANCE.rebindMetadataNode(); + clusterManager.updateMetadataNode(response.getNodeId(), response.isExported()); + if (!response.isExported()) { + requestMetadataNodeTakeover(metadataNodeId); + } + } + + private void requestMetadataNodeTakeover(String node) throws HyracksDataException { + MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true); + try { + messageBroker.sendApplicationMessageToNC(msg, node); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java deleted file mode 100644 index 49f4b32..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.replication; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.asterix.app.nc.task.BindMetadataNodeTask; -import org.apache.asterix.app.nc.task.CheckpointTask; -import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; -import org.apache.asterix.app.nc.task.LocalRecoveryTask; -import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.ReportLocalCountersTask; -import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; -import org.apache.asterix.app.nc.task.StartReplicationServiceTask; -import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage; -import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage; -import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; -import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; -import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.common.cluster.ClusterPartition; -import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.messaging.api.ICCMessageBroker; -import org.apache.asterix.common.replication.IFaultToleranceStrategy; -import org.apache.asterix.common.replication.INCLifecycleMessage; -import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; -import org.apache.asterix.metadata.MetadataManager; -import org.apache.hyracks.api.application.ICCServiceContext; -import org.apache.hyracks.api.client.NodeStatus; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { - - private static final Logger LOGGER = LogManager.getLogger(); - private IClusterStateManager clusterManager; - private String metadataNodeId; - private Set<String> pendingStartupCompletionNodes = new HashSet<>(); - private ICCMessageBroker messageBroker; - private boolean replicationEnabled; - - @Override - public void notifyNodeJoin(String nodeId) throws HyracksDataException { - pendingStartupCompletionNodes.add(nodeId); - } - - @Override - public void notifyNodeFailure(String nodeId) throws HyracksDataException { - pendingStartupCompletionNodes.remove(nodeId); - clusterManager.updateNodePartitions(nodeId, false); - if (nodeId.equals(metadataNodeId)) { - clusterManager.updateMetadataNode(metadataNodeId, false); - } - clusterManager.refreshState(); - } - - @Override - public void process(INCLifecycleMessage message) throws HyracksDataException { - switch (message.getType()) { - case REGISTRATION_TASKS_REQUEST: - process((RegistrationTasksRequestMessage) message); - break; - case REGISTRATION_TASKS_RESULT: - process((NCLifecycleTaskReportMessage) message); - break; - case METADATA_NODE_RESPONSE: - process((MetadataNodeResponseMessage) message); - break; - default: - throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name()); - } - } - - @Override - public IFaultToleranceStrategy from(ICCServiceContext serviceCtx, boolean replicationEnabled) { - NoFaultToleranceStrategy ft = new NoFaultToleranceStrategy(); - ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); - ft.replicationEnabled = replicationEnabled; - return ft; - } - - @Override - public void bindTo(IClusterStateManager clusterManager) { - this.clusterManager = clusterManager; - metadataNodeId = clusterManager.getCurrentMetadataNodeId(); - } - - private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { - final String nodeId = msg.getNodeId(); - List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState()); - RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); - try { - messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } - - private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { - pendingStartupCompletionNodes.remove(msg.getNodeId()); - if (msg.isSuccess()) { - clusterManager.updateNodePartitions(msg.getNodeId(), true); - if (msg.getNodeId().equals(metadataNodeId)) { - clusterManager.updateMetadataNode(metadataNodeId, true); - } - clusterManager.refreshState(); - } else { - if (LOGGER.isErrorEnabled()) { - LOGGER.log(Level.ERROR, msg.getNodeId() + " failed to complete startup. ", msg.getException()); - } - } - } - - private List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) { - LOGGER.log(Level.INFO, () -> "Building registration tasks for node: " + nodeId + " with state: " + state); - final boolean isMetadataNode = nodeId.equals(metadataNodeId); - if (nodeStatus == NodeStatus.ACTIVE) { - /* - * if the node state is already ACTIVE then it completed - * booting and just re-registering with a new/failed CC. - */ - return buildActiveNCRegTasks(isMetadataNode); - } - final List<INCLifecycleTask> tasks = new ArrayList<>(); - if (state == SystemState.CORRUPTED) { - //need to perform local recovery for node partitions - LocalRecoveryTask rt = new LocalRecoveryTask( - Arrays.asList(clusterManager.getNodePartitions(nodeId)).stream() - .map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); - tasks.add(rt); - } - if (replicationEnabled) { - tasks.add(new StartReplicationServiceTask()); - } - if (isMetadataNode) { - tasks.add(new MetadataBootstrapTask()); - } - tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportLocalCountersTask()); - tasks.add(new CheckpointTask()); - tasks.add(new StartLifecycleComponentsTask()); - if (isMetadataNode) { - tasks.add(new BindMetadataNodeTask(true)); - } - return tasks; - } - - private List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) { - final List<INCLifecycleTask> tasks = new ArrayList<>(); - if (metadataNode) { - // need to unbind from old distributed state then rebind to new one - tasks.add(new BindMetadataNodeTask(false)); - tasks.add(new BindMetadataNodeTask(true)); - } - tasks.add(new ReportLocalCountersTask()); - return tasks; - } - - @Override - public void notifyMetadataNodeChange(String node) throws HyracksDataException { - if (metadataNodeId.equals(node)) { - return; - } - // if current metadata node is active, we need to unbind its metadata proxy object - if (clusterManager.isMetadataNodeActive()) { - MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false); - try { - messageBroker.sendApplicationMessageToNC(msg, metadataNodeId); - // when the current node responses, we will bind to the new one - metadataNodeId = node; - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } else { - requestMetadataNodeTakeover(node); - } - } - - private void process(MetadataNodeResponseMessage response) throws HyracksDataException { - // rebind metadata node since it might be changing - MetadataManager.INSTANCE.rebindMetadataNode(); - clusterManager.updateMetadataNode(response.getNodeId(), response.isExported()); - if (!response.isExported()) { - requestMetadataNodeTakeover(metadataNodeId); - } - } - - private void requestMetadataNodeTakeover(String node) throws HyracksDataException { - MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true); - try { - messageBroker.sendApplicationMessageToNC(msg, node); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java index ebde9b9..815d878 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java @@ -41,7 +41,7 @@ public class MetadataNodeResponseMessage implements INCLifecycleMessage, ICcAddr @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ((CcApplicationContext) appCtx).getFaultToleranceStrategy().process(this); + ((CcApplicationContext) appCtx).getNcLifecycleCoordinator().process(this); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java index b654fd8..5ac3a0c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -37,7 +37,7 @@ public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAdd @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - appCtx.getFaultToleranceStrategy().process(this); + appCtx.getNcLifecycleCoordinator().process(this); } public String getNodeId() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java index b60ef8b..6ca576a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java @@ -58,7 +58,7 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - appCtx.getFaultToleranceStrategy().process(this); + appCtx.getNcLifecycleCoordinator().process(this); } public SystemState getState() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index c2a0fc1..11f4e1c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -55,7 +55,7 @@ import org.apache.asterix.api.http.server.VersionApiServlet; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.external.ExternalLibraryUtils; -import org.apache.asterix.app.replication.FaultToleranceStrategyFactory; +import org.apache.asterix.app.replication.NcLifecycleCoordinator; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.INodeJobTracker; @@ -67,9 +67,7 @@ import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.common.replication.IFaultToleranceStrategy; -import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.common.replication.ReplicationStrategyFactory; +import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.file.StorageComponentProvider; @@ -142,13 +140,12 @@ public class CCApplication extends BaseCCApplication { ILibraryManager libraryManager = new ExternalLibraryManager(); ReplicationProperties repProp = new ReplicationProperties( PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())); - IFaultToleranceStrategy ftStrategy = - FaultToleranceStrategyFactory.create(ccServiceCtx, repProp.isReplicationEnabled()); + INcLifecycleCoordinator lifecycleCoordinator = createNcLifeCycleCoordinator(repProp.isReplicationEnabled()); ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); componentProvider = new StorageComponentProvider(); GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); statementExecutorCtx = new StatementExecutorContext(); - appCtx = createApplicationContext(libraryManager, globalRecoveryManager, ftStrategy); + appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator); List<AsterixExtension> extensions = new ArrayList<>(); extensions.addAll(this.getExtensions()); ccExtensionManager = new CCExtensionManager(extensions); @@ -177,10 +174,10 @@ public class CCApplication extends BaseCCApplication { } protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager, - GlobalRecoveryManager globalRecoveryManager, IFaultToleranceStrategy ftStrategy) + GlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator) throws AlgebricksException, IOException { return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE, - globalRecoveryManager, ftStrategy, new ActiveNotificationHandler(), componentProvider, + globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, new MetadataLockManager()); } @@ -188,6 +185,10 @@ public class CCApplication extends BaseCCApplication { return new GlobalRecoveryManager(ccServiceCtx, getHcc(), componentProvider); } + protected INcLifecycleCoordinator createNcLifeCycleCoordinator(boolean replicationEnabled) { + return new NcLifecycleCoordinator(ccServiceCtx, replicationEnabled); + } + @Override protected void configureLoggingLevel(Level level) { super.configureLoggingLevel(level); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index 07d3584..ca314aa 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -24,7 +24,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import org.apache.asterix.app.replication.NoFaultToleranceStrategy; +import org.apache.asterix.app.replication.NcLifecycleCoordinator; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; @@ -196,7 +196,7 @@ public class ClusterStateManagerTest { throws HyracksDataException { NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true); applicationContext.getResourceIdManager().report(nodeId, 0); - applicationContext.getFaultToleranceStrategy().process(msg); + applicationContext.getNcLifecycleCoordinator().process(msg); } private CcApplicationContext ccAppContext(ClusterStateManager csm) throws HyracksDataException { @@ -207,9 +207,9 @@ public class ClusterStateManagerTest { Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig); Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext); - NoFaultToleranceStrategy fts = new NoFaultToleranceStrategy(); - fts.bindTo(csm); - Mockito.when(ccApplicationContext.getFaultToleranceStrategy()).thenReturn(fts); + NcLifecycleCoordinator coordinator = new NcLifecycleCoordinator(ccApplicationContext.getServiceContext(), false); + coordinator.bindTo(csm); + Mockito.when(ccApplicationContext.getNcLifecycleCoordinator()).thenReturn(coordinator); MetadataProperties metadataProperties = mockMetadataProperties(); Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 6181ade..e02482d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -26,7 +26,7 @@ import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ExtensionProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.metadata.IMetadataBootstrap; -import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.job.IJobLifecycleListener; @@ -62,9 +62,9 @@ public interface ICcApplicationContext extends IApplicationContext { IGlobalRecoveryManager getGlobalRecoveryManager(); /** - * @return the fault tolerance strategy in use for the cluster + * @return the NC lifecycle coordinator in use for the cluster */ - IFaultToleranceStrategy getFaultToleranceStrategy(); + INcLifecycleCoordinator getNcLifecycleCoordinator(); /** * @return the active notification handler at Cluster controller http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java deleted file mode 100644 index bf4cac9..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.replication; - -import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.hyracks.api.application.ICCServiceContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public interface IFaultToleranceStrategy { - - /** - * Defines the logic of a {@link IFaultToleranceStrategy} when a node joins the cluster. - * - * @param nodeId - * @throws HyracksDataException - */ - void notifyNodeJoin(String nodeId) throws HyracksDataException; - - /** - * Defines the logic of a {@link IFaultToleranceStrategy} when a node leaves the cluster. - * - * @param nodeId - * @throws HyracksDataException - */ - void notifyNodeFailure(String nodeId) throws HyracksDataException; - - /** - * Binds the fault tolerance strategy to {@code cluserManager}. - * - * @param clusterManager - */ - void bindTo(IClusterStateManager clusterManager); - - /** - * Processes {@code message} based on the message type. - * - * @param message - * @throws HyracksDataException - */ - void process(INCLifecycleMessage message) throws HyracksDataException; - - /** - * Constructs a fault tolerance strategy. - * - * @param serviceCtx - * @param replicationEnabled - * @return the fault tolerance strategy - */ - IFaultToleranceStrategy from(ICCServiceContext serviceCtx, boolean replicationEnabled); - - /** - * Performs the required steps to change the metadata node to {@code node} - * - * @param node - */ - default void notifyMetadataNodeChange(String node) throws HyracksDataException { - throw new UnsupportedOperationException(getClass() + " does not support metadata node change"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java new file mode 100644 index 0000000..1a7c3c8 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.replication; + +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface INcLifecycleCoordinator { + + /** + * Defines the logic of a {@link INcLifecycleCoordinator} when a node joins the cluster. + * + * @param nodeId + * @throws HyracksDataException + */ + void notifyNodeJoin(String nodeId) throws HyracksDataException; + + /** + * Defines the logic of a {@link INcLifecycleCoordinator} when a node leaves the cluster. + * + * @param nodeId + * @throws HyracksDataException + */ + void notifyNodeFailure(String nodeId) throws HyracksDataException; + + /** + * Binds the coordinator to {@code cluserManager}. + * + * @param clusterManager + */ + void bindTo(IClusterStateManager clusterManager); + + /** + * Processes {@code message} based on the message type. + * + * @param message + * @throws HyracksDataException + */ + void process(INCLifecycleMessage message) throws HyracksDataException; + + /** + * Performs the required steps to change the metadata node to {@code node} + * + * @param node + */ + void notifyMetadataNodeChange(String node) throws HyracksDataException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index f991cd4..d8f14a2 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -42,7 +42,7 @@ import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.metadata.IMetadataBootstrap; -import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.runtime.job.listener.NodeJobTracker; import org.apache.asterix.runtime.transaction.ResourceIdManager; @@ -80,7 +80,7 @@ public class CcApplicationContext implements ICcApplicationContext { private Supplier<IMetadataBootstrap> metadataBootstrapSupplier; private IHyracksClientConnection hcc; private Object extensionManager; - private IFaultToleranceStrategy ftStrategy; + private INcLifecycleCoordinator ftStrategy; private IJobLifecycleListener activeLifeCycleListener; private IMetadataLockManager mdLockManager; private IClusterStateManager clusterStateManager; @@ -88,7 +88,7 @@ public class CcApplicationContext implements ICcApplicationContext { public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, - IGlobalRecoveryManager globalRecoveryManager, IFaultToleranceStrategy ftStrategy, + IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, IMetadataLockManager mdLockManager) throws AlgebricksException, IOException { this.ccServiceCtx = ccServiceCtx; @@ -232,7 +232,7 @@ public class CcApplicationContext implements ICcApplicationContext { } @Override - public IFaultToleranceStrategy getFaultToleranceStrategy() { + public INcLifecycleCoordinator getNcLifecycleCoordinator() { return ftStrategy; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/f1228885/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index c07200a..6994862 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -35,7 +34,7 @@ import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -60,11 +59,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; */ public class ClusterStateManager implements IClusterStateManager { - /* - * TODO: currently after instance restarts we require all nodes to join again, - * otherwise the cluster wont be ACTIVE. we may overcome this by storing the cluster state before the instance - * shutdown and using it on startup to identify the nodes that are expected the join. - */ private static final Logger LOGGER = LogManager.getLogger(); private final Map<String, Map<IOption, Object>> ncConfigMap = new HashMap<>(); @@ -77,7 +71,7 @@ public class ClusterStateManager implements IClusterStateManager { private boolean metadataNodeActive = false; private Set<String> failedNodes = new HashSet<>(); private Set<String> participantNodes = new HashSet<>(); - private IFaultToleranceStrategy ftStrategy; + private INcLifecycleCoordinator lifecycleCoordinator; private ICcApplicationContext appCtx; @Override @@ -86,8 +80,8 @@ public class ClusterStateManager implements IClusterStateManager { node2PartitionsMap = appCtx.getMetadataProperties().getNodePartitions(); clusterPartitions = appCtx.getMetadataProperties().getClusterPartitions(); currentMetadataNode = appCtx.getMetadataProperties().getMetadataNodeName(); - ftStrategy = appCtx.getFaultToleranceStrategy(); - ftStrategy.bindTo(this); + lifecycleCoordinator = appCtx.getNcLifecycleCoordinator(); + lifecycleCoordinator.bindTo(this); } @Override @@ -98,7 +92,7 @@ public class ClusterStateManager implements IClusterStateManager { failedNodes.add(nodeId); ncConfigMap.remove(nodeId); pendingRemoval.remove(nodeId); - ftStrategy.notifyNodeFailure(nodeId); + lifecycleCoordinator.notifyNodeFailure(nodeId); } @Override @@ -109,7 +103,7 @@ public class ClusterStateManager implements IClusterStateManager { failedNodes.remove(nodeId); ncConfigMap.put(nodeId, configuration); updateNodeConfig(nodeId, configuration); - ftStrategy.notifyNodeJoin(nodeId); + lifecycleCoordinator.notifyNodeJoin(nodeId); } @Override