Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][TX] Ensure TxnIdFactory Value is Initialized ......................................................................
[NO ISSUE][TX] Ensure TxnIdFactory Value is Initialized - user model changes: no - storage format changes: no - interface changes: no Details: - Report local max txn id after node registration. - Add node status BOOTING. - Distinguish between node first time registration and registration after restarting by using NodeStatus BOOTING to respond with the proper node post registration tasks. - Rename node status ALIVE -> ACTIVE. - Rename StartupTask* to RegistrationTasks* Change-Id: I6899c9e7d6e744ca92d0108556e086a23639d78b Reviewed-on: https://asterix-gerrit.ics.uci.edu/2151 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java R asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java R asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java R asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java M hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java 18 files changed, 127 insertions(+), 86 deletions(-) Approvals: Jenkins: Verified; No violations found; Verified Michael Blow: Looks good to me, approved; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 7b08f68..e77d535 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -481,7 +481,9 @@ @Override public synchronized void unexportMetadataNodeStub() throws RemoteException { - UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false); + if (metadataNodeStub != null) { + UnicastRemoteObject.unexportObject(MetadataNode.INSTANCE, false); + } metadataNodeStub = null; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java similarity index 87% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java index 22d3cde..86f7d1c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportMaxResourceIdTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java @@ -19,18 +19,18 @@ package org.apache.asterix.app.nc.task; import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage; +import org.apache.asterix.runtime.message.ReportLocalCountersMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.nc.NodeControllerService; -public class ReportMaxResourceIdTask implements INCLifecycleTask { +public class ReportLocalCountersTask implements INCLifecycleTask { private static final long serialVersionUID = 1L; @Override public void perform(IControllerService cs) throws HyracksDataException { - ReportMaxResourceIdMessage.send((NodeControllerService) cs); + ReportLocalCountersMessage.send((NodeControllerService) cs); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java index 4ac1305..23f225e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java @@ -33,7 +33,7 @@ import org.apache.asterix.app.nc.task.CheckpointTask; import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartFailbackTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.nc.task.StartReplicationServiceTask; @@ -43,8 +43,8 @@ import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; import org.apache.asterix.app.replication.message.PreparePartitionsFailbackRequestMessage; import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; -import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage; import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage; import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage; @@ -431,10 +431,10 @@ @Override public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { switch (message.getType()) { - case STARTUP_TASK_REQUEST: - process((StartupTaskRequestMessage) message); + case REGISTRATION_TASKS_REQUEST: + process((RegistrationTasksRequestMessage) message); break; - case STARTUP_TASK_RESULT: + case REGISTRATION_TASKS_RESULT: process((NCLifecycleTaskReportMessage) message); break; case TAKEOVER_PARTITION_RESPONSE: @@ -483,7 +483,7 @@ currentMetadataNode = clusterManager.getCurrentMetadataNodeId(); } - private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException { + private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); final SystemState state = msg.getState(); List<INCLifecycleTask> tasks; @@ -493,7 +493,7 @@ // failed node returned. Need to start failback process tasks = buildFailbackStartupSequence(); } - StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); } catch (Exception e) { @@ -504,7 +504,7 @@ private List<INCLifecycleTask> buildFailbackStartupSequence() { final List<INCLifecycleTask> tasks = new ArrayList<>(); tasks.add(new StartFailbackTask()); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new StartLifecycleComponentsTask()); return tasks; } @@ -517,7 +517,7 @@ tasks.add(new MetadataBootstrapTask()); } tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); if (isMetadataNode) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java index 1b57403..3341813 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java @@ -35,14 +35,14 @@ import org.apache.asterix.app.nc.task.LocalRecoveryTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; import org.apache.asterix.app.nc.task.RemoteRecoveryTask; -import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.nc.task.StartReplicationServiceTask; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage; import org.apache.asterix.app.replication.message.ReplayPartitionLogsResponseMessage; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; -import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; @@ -123,10 +123,10 @@ @Override public synchronized void process(INCLifecycleMessage message) throws HyracksDataException { switch (message.getType()) { - case STARTUP_TASK_REQUEST: - process((StartupTaskRequestMessage) message); + case REGISTRATION_TASKS_REQUEST: + process((RegistrationTasksRequestMessage) message); break; - case STARTUP_TASK_RESULT: + case REGISTRATION_TASKS_RESULT: process((NCLifecycleTaskReportMessage) message); break; case REPLAY_LOGS_RESPONSE: @@ -150,7 +150,7 @@ } } - private synchronized void process(StartupTaskRequestMessage msg) throws HyracksDataException { + private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); final SystemState state = msg.getState(); final boolean isParticipant = replicationStrategy.isParticipant(nodeId); @@ -160,7 +160,7 @@ } else { tasks = buildParticipantStartupSequence(nodeId, state); } - StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); } catch (Exception e) { @@ -199,7 +199,7 @@ tasks.add(rt); } tasks.add(new ExternalLibrarySetupTask(false)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); return tasks; @@ -234,7 +234,7 @@ tasks.add(new MetadataBootstrapTask()); } tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); if (isMetadataNode) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java index b9ea135..a273845 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java @@ -32,11 +32,11 @@ import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; import org.apache.asterix.app.nc.task.LocalRecoveryTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.ReportMaxResourceIdTask; +import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; -import org.apache.asterix.app.replication.message.StartupTaskResponseMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; @@ -48,12 +48,13 @@ import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.exceptions.HyracksDataException; public class NoFaultToleranceStrategy implements IFaultToleranceStrategy { private static final Logger LOGGER = Logger.getLogger(NoFaultToleranceStrategy.class.getName()); - IClusterStateManager clusterManager; + private IClusterStateManager clusterManager; private String metadataNodeId; private Set<String> pendingStartupCompletionNodes = new HashSet<>(); private ICCMessageBroker messageBroker; @@ -76,10 +77,10 @@ @Override public void process(INCLifecycleMessage message) throws HyracksDataException { switch (message.getType()) { - case STARTUP_TASK_REQUEST: - process((StartupTaskRequestMessage) message); + case REGISTRATION_TASKS_REQUEST: + process((RegistrationTasksRequestMessage) message); break; - case STARTUP_TASK_RESULT: + case REGISTRATION_TASKS_RESULT: process((NCLifecycleTaskReportMessage) message); break; default: @@ -100,10 +101,10 @@ metadataNodeId = clusterManager.getCurrentMetadataNodeId(); } - private void process(StartupTaskRequestMessage msg) throws HyracksDataException { + private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); - List<INCLifecycleTask> tasks = buildNCStartupSequence(msg.getNodeId(), msg.getState()); - StartupTaskResponseMessage response = new StartupTaskResponseMessage(nodeId, tasks); + List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState()); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); } catch (Exception e) { @@ -126,7 +127,16 @@ } } - private List<INCLifecycleTask> buildNCStartupSequence(String nodeId, SystemState state) { + private List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) { + LOGGER.log(Level.INFO, () -> "Building registration tasks for node: " + nodeId + " with state: " + state); + final boolean isMetadataNode = nodeId.equals(metadataNodeId); + if (nodeStatus == NodeStatus.ACTIVE) { + /* + * if the node state is already ACTIVE then it completed + * booting and just re-registering with a new/failed CC. + */ + return buildActiveNCRegTasks(isMetadataNode); + } final List<INCLifecycleTask> tasks = new ArrayList<>(); if (state == SystemState.CORRUPTED) { //need to perform local recovery for node partitions @@ -134,12 +144,11 @@ .stream().map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); tasks.add(rt); } - final boolean isMetadataNode = nodeId.equals(metadataNodeId); if (isMetadataNode) { tasks.add(new MetadataBootstrapTask()); } tasks.add(new ExternalLibrarySetupTask(isMetadataNode)); - tasks.add(new ReportMaxResourceIdTask()); + tasks.add(new ReportLocalCountersTask()); tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); if (isMetadataNode) { @@ -147,4 +156,15 @@ } return tasks; } + + private List<INCLifecycleTask> buildActiveNCRegTasks(boolean metadataNode) { + final List<INCLifecycleTask> tasks = new ArrayList<>(); + if (metadataNode) { + // need to unbind from old distributed state then rebind to new one + tasks.add(new BindMetadataNodeTask(false)); + tasks.add(new BindMetadataNodeTask(true)); + } + tasks.add(new ReportLocalCountersTask()); + return tasks; + } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java index 2b32e1f..b654fd8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -58,6 +58,6 @@ @Override public MessageType getType() { - return MessageType.STARTUP_TASK_RESULT; + return MessageType.REGISTRATION_TASKS_RESULT; } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java similarity index 69% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java index 21dee9c..075c415 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java @@ -26,27 +26,32 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.NodeControllerService; -public class StartupTaskRequestMessage implements INCLifecycleMessage, ICcAddressedMessage { +public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage { - private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName()); + private static final Logger LOGGER = Logger.getLogger(RegistrationTasksRequestMessage.class.getName()); private static final long serialVersionUID = 1L; private final SystemState state; private final String nodeId; + private final NodeStatus nodeStatus; - public StartupTaskRequestMessage(String nodeId, SystemState state) { + public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state) { this.state = state; this.nodeId = nodeId; + this.nodeStatus = nodeStatus; } - public static void send(NodeControllerService cs, SystemState systemState) throws HyracksDataException { + public static void send(NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState) + throws HyracksDataException { try { - StartupTaskRequestMessage msg = new StartupTaskRequestMessage(cs.getId(), systemState); + RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, + systemState); ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(msg); } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e); + LOGGER.log(Level.SEVERE, "Unable to send RegistrationTasksRequestMessage to CC", e); throw HyracksDataException.create(e); } } @@ -64,8 +69,13 @@ return nodeId; } + public NodeStatus getNodeStatus() { + return nodeStatus; + } + @Override public MessageType getType() { - return MessageType.STARTUP_TASK_REQUEST; + return MessageType.REGISTRATION_TASKS_REQUEST; } + } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java similarity index 90% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java index b941343..13525e3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -32,14 +32,14 @@ import org.apache.hyracks.control.nc.NCShutdownHook; import org.apache.hyracks.util.ExitUtil; -public class StartupTaskResponseMessage implements INCLifecycleMessage, INcAddressedMessage { +public class RegistrationTasksResponseMessage implements INCLifecycleMessage, INcAddressedMessage { - private static final Logger LOGGER = Logger.getLogger(StartupTaskResponseMessage.class.getName()); + private static final Logger LOGGER = Logger.getLogger(RegistrationTasksResponseMessage.class.getName()); private static final long serialVersionUID = 1L; private final String nodeId; private final List<INCLifecycleTask> tasks; - public StartupTaskResponseMessage(String nodeId, List<INCLifecycleTask> tasks) { + public RegistrationTasksResponseMessage(String nodeId, List<INCLifecycleTask> tasks) { this.nodeId = nodeId; this.tasks = tasks; } @@ -88,6 +88,6 @@ @Override public MessageType getType() { - return MessageType.STARTUP_TASK_RESPONSE; + return MessageType.REGISTRATION_TASKS_RESPONSE; } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 3d7f870..a18535d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -26,7 +26,7 @@ import java.util.logging.Logger; import org.apache.asterix.app.nc.NCAppRuntimeContext; -import org.apache.asterix.app.replication.message.StartupTaskRequestMessage; +import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.AsterixExtension; @@ -48,6 +48,7 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; @@ -67,7 +68,6 @@ private String nodeId; private boolean stopInitiated; private boolean startupCompleted; - private SystemState systemState; protected WebManager webManager; @Override @@ -117,9 +117,8 @@ this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory); IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); - systemState = recoveryMgr.getSystemState(); - - if (systemState == SystemState.PERMANENT_DATA_LOSS) { + final SystemState stateOnStartup = recoveryMgr.getSystemState(); + if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("System state: " + SystemState.PERMANENT_DATA_LOSS); LOGGER.info("Node ID: " + nodeId); @@ -187,20 +186,27 @@ // Since we don't pass initial run flag in AsterixHyracksIntegrationUtil, we use the virtualNC flag final NodeProperties nodeProperties = runtimeContext.getNodeProperties(); - if (systemState == SystemState.PERMANENT_DATA_LOSS - && (nodeProperties.isInitialRun() || nodeProperties.isVirtualNc())) { - systemState = SystemState.BOOTSTRAPPING; + IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); + SystemState state = recoveryMgr.getSystemState(); + if (state == SystemState.PERMANENT_DATA_LOSS && (nodeProperties.isInitialRun() || nodeProperties + .isVirtualNc())) { + state = SystemState.BOOTSTRAPPING; } - // Request startup tasks from CC - StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState); + // Request registration tasks from CC + RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), + NodeStatus.BOOTING, state); startupCompleted = true; } @Override public void onRegisterNode() throws Exception { if (startupCompleted) { - // Request startup tasks from CC - StartupTaskRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), systemState); + /* + * If the node completed its startup before, then this is a re-registration with + * the CC and therefore the system state should be HEALTHY and the node status is ACTIVE + */ + RegistrationTasksRequestMessage.send((NodeControllerService) ncServiceCtx.getControllerService(), + NodeStatus.ACTIVE, SystemState.HEALTHY); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java index 87b0856..cb9fa8f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java @@ -22,16 +22,16 @@ public interface INCLifecycleMessage extends IMessage { - public enum MessageType { + enum MessageType { REPLAY_LOGS_REQUEST, REPLAY_LOGS_RESPONSE, PREPARE_FAILBACK_REQUEST, PREPARE_FAILBACK_RESPONSE, COMPLETE_FAILBACK_REQUEST, COMPLETE_FAILBACK_RESPONSE, - STARTUP_TASK_REQUEST, - STARTUP_TASK_RESPONSE, - STARTUP_TASK_RESULT, + REGISTRATION_TASKS_REQUEST, + REGISTRATION_TASKS_RESPONSE, + REGISTRATION_TASKS_RESULT, TAKEOVER_PARTITION_REQUEST, TAKEOVER_PARTITION_RESPONSE, TAKEOVER_METADATA_NODE_REQUEST, diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java similarity index 76% rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java index 277c0ba..3f8ced8 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java @@ -27,27 +27,27 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.transaction.management.service.transaction.TxnIdFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.NodeControllerService; -public class ReportMaxResourceIdMessage implements ICcAddressedMessage { +public class ReportLocalCountersMessage implements ICcAddressedMessage { private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(ReportMaxResourceIdMessage.class.getName()); + private static final Logger LOGGER = Logger.getLogger(ReportLocalCountersMessage.class.getName()); private final long maxResourceId; + private final long maxTxnId; private final String src; - public ReportMaxResourceIdMessage(String src, long maxResourceId) { + public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId) { this.src = src; this.maxResourceId = maxResourceId; - } - - public long getMaxResourceId() { - return maxResourceId; + this.maxTxnId = maxTxnId; } @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); + TxnIdFactory.ensureMinimumId(maxTxnId); resourceIdManager.report(src, maxResourceId); } @@ -56,17 +56,19 @@ INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); - ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId); + long maxTxnId = appContext.getTransactionSubsystem().getTransactionManager().getMaxTxnId(); + ReportLocalCountersMessage countersMessage = + new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId); try { - ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg); + ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(countersMessage); } catch (Exception e) { - LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e); + LOGGER.log(Level.SEVERE, "Unable to report local counters", e); throw HyracksDataException.create(e); } } @Override public String toString() { - return ReportMaxResourceIdMessage.class.getSimpleName(); + return ReportLocalCountersMessage.class.getSimpleName(); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java similarity index 87% rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java index a43376d..785ad2f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java @@ -23,16 +23,16 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.NodeControllerService; -public class ReportMaxResourceIdRequestMessage implements INcAddressedMessage { +public class ReportLocalCountersRequestMessage implements INcAddressedMessage { private static final long serialVersionUID = 1L; @Override public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ReportMaxResourceIdMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService()); + ReportLocalCountersMessage.send((NodeControllerService) appCtx.getServiceContext().getControllerService()); } @Override public String toString() { - return ReportMaxResourceIdRequestMessage.class.getSimpleName(); + return ReportLocalCountersRequestMessage.class.getSimpleName(); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index decc1a9..a2f4aa1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -61,7 +61,7 @@ private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager, ICCMessageBroker broker) throws Exception { Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(); - ReportMaxResourceIdRequestMessage msg = new ReportMaxResourceIdRequestMessage(); + ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage(); for (String nodeId : getParticipantNodes) { if (!resourceIdManager.reported(nodeId)) { broker.sendApplicationMessageToNC(msg, nodeId); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java index 71d7f56..eb59e74 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TxnIdFactory.java @@ -37,6 +37,6 @@ } public static void ensureMinimumId(long id) { - TxnIdFactory.id.set(id); + TxnIdFactory.id.updateAndGet(current -> Math.max(current, id)); } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java index b84f1f2..10a9a3c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/NodeStatus.java @@ -19,6 +19,7 @@ package org.apache.hyracks.api.client; public enum NodeStatus { - ALIVE, + ACTIVE, + BOOTING, DEAD } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 3cd6235..4928564 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -136,7 +136,7 @@ public Map<String, NodeControllerInfo> getNodeControllerInfoMap() { Map<String, NodeControllerInfo> result = new LinkedHashMap<>(); nodeRegistry.forEach( - (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ALIVE, ncState.getDataPort(), + (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(), ncState.getDatasetPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores()))); return result; } diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java index 445a15c..bb28c79 100644 --- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java @@ -97,10 +97,10 @@ Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.", dataPort, resultPort, messagingPort); ncNameToNcInfos.put("nc7", - new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", dataPort), + new NodeControllerInfo("nc7", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.7", dataPort), new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2)); ncNameToNcInfos.put("nc12", - new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", dataPort), + new NodeControllerInfo("nc12", NodeStatus.ACTIVE, new NetworkAddress("10.0.0.12", dataPort), new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort), 2)); InputSplit[] fileSplits = new InputSplit[12]; diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java index c3d86e8..1814e85 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java @@ -135,7 +135,7 @@ String ncId = ncNamePrefix + i; String ncAddress = addressPrefix + i; ncNameToNcInfos.put(ncId, - new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort), + new NodeControllerInfo(ncId, NodeStatus.ACTIVE, new NetworkAddress(ncAddress, netPort), new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort), 2)); } return ncNameToNcInfos; -- To view, visit https://asterix-gerrit.ics.uci.edu/2151 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I6899c9e7d6e744ca92d0108556e086a23639d78b Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
