Murtadha Hubail has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2767
Change subject: [NO ISSUE][CLUS] Include NC Local Counters in Startup Message ...................................................................... [NO ISSUE][CLUS] Include NC Local Counters in Startup Message - user model changes: no - storage format changes: no - interface changes: yes Details: - Include NC local counters in the NC startup tasks completion completion message. - Ensure no NC is marked as active without its local counters reported. - Remove the need for individual NC local counters message. - Clean up ITxnIdFactory and IResourceIdManager APIs. Change-Id: Ief8b9d43783ea22810c6fdb29947a1284e32daee --- D asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java D asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java D asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 16 files changed, 121 insertions(+), 209 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/67/2767/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java deleted file mode 100644 index 53b13e8..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ReportLocalCountersTask.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.nc.task; - -import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.runtime.message.ReportLocalCountersMessage; -import org.apache.hyracks.api.control.CcId; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.service.IControllerService; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class ReportLocalCountersTask implements INCLifecycleTask { - - private static final long serialVersionUID = 1L; - - @Override - public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { - ReportLocalCountersMessage.send(ccId, (NodeControllerService) cs); - } - - @Override - public String toString() { - return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }"; - } -} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index 043e02e..b6b009f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -31,7 +31,6 @@ import org.apache.asterix.app.nc.task.ExternalLibrarySetupTask; import org.apache.asterix.app.nc.task.LocalRecoveryTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; -import org.apache.asterix.app.nc.task.ReportLocalCountersTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; import org.apache.asterix.app.nc.task.StartReplicationServiceTask; import org.apache.asterix.app.nc.task.UpdateNodeStatusTask; @@ -79,7 +78,7 @@ @Override public void notifyNodeFailure(String nodeId) throws HyracksDataException { pendingStartupCompletionNodes.remove(nodeId); - clusterManager.updateNodePartitions(nodeId, false); + clusterManager.updateNodeState(nodeId, false, null); if (nodeId.equals(metadataNodeId)) { clusterManager.updateMetadataNode(metadataNodeId, false); } @@ -125,7 +124,7 @@ private void process(NCLifecycleTaskReportMessage msg) throws HyracksDataException { pendingStartupCompletionNodes.remove(msg.getNodeId()); if (msg.isSuccess()) { - clusterManager.updateNodePartitions(msg.getNodeId(), true); + clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters()); if (msg.getNodeId().equals(metadataNodeId)) { clusterManager.updateMetadataNode(metadataNodeId, true); } @@ -156,7 +155,6 @@ if (metadataNode) { tasks.add(new BindMetadataNodeTask()); } - tasks.add(new ReportLocalCountersTask()); return tasks; } @@ -203,7 +201,6 @@ tasks.add(new ExportMetadataNodeTask(true)); tasks.add(new BindMetadataNodeTask()); } - tasks.add(new ReportLocalCountersTask()); tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE)); return tasks; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java index 5ac3a0c..21022ed 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.app.replication.message; +import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; @@ -29,14 +30,16 @@ private final String nodeId; private final boolean success; private Throwable exception; + private final NcLocalCounters localCounters; - public NCLifecycleTaskReportMessage(String nodeId, boolean success) { + public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) { this.nodeId = nodeId; this.success = success; + this.localCounters = localCounters; } @Override - public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + public void handle(ICcApplicationContext appCtx) throws HyracksDataException { appCtx.getNcLifecycleCoordinator().process(this); } @@ -56,6 +59,10 @@ this.exception = exception; } + public NcLocalCounters getLocalCounters() { + return localCounters; + } + @Override public MessageType getType() { return MessageType.REGISTRATION_TASKS_RESULT; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java index 868c2ad..2d2fe97 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.CcIdentifiedMessage; @@ -28,6 +29,7 @@ import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.util.ExitUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; @@ -68,7 +70,9 @@ success = false; exception = e; } - NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success); + NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(), + (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null; + NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter); result.setException(exception); try { broker.sendMessageToCC(getCcId(), result); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index 4beb44a..9d367ac 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -31,14 +31,19 @@ import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.MetadataProperties; import org.apache.asterix.common.metadata.IMetadataBootstrap; +import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.asterix.runtime.transaction.ResourceIdManager; +import org.apache.asterix.runtime.utils.BulkTxnIdFactory; import org.apache.asterix.runtime.utils.CcApplicationContext; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.config.IApplicationConfig; +import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobIdFactory; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.application.CCServiceContext; import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig; import org.apache.hyracks.control.common.config.ConfigManager; @@ -194,8 +199,7 @@ private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId) throws HyracksDataException { - NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true); - applicationContext.getResourceIdManager().report(nodeId, 0); + NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters()); applicationContext.getNcLifecycleCoordinator().process(msg); } @@ -204,7 +208,12 @@ ConfigManager configManager = new ConfigManager(null); IApplicationConfig applicationConfig = new ConfigManagerApplicationConfig(configManager); ICCServiceContext iccServiceContext = Mockito.mock(CCServiceContext.class); + final ClusterControllerService ccs = Mockito.mock(ClusterControllerService.class); + JobIdFactory jobIdFactory = new JobIdFactory(CcId.valueOf(0)); + Mockito.when(ccs.getJobIdFactory()).thenReturn(jobIdFactory); Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig); + Mockito.when(iccServiceContext.getControllerService()).thenReturn(ccs); + Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext); NcLifecycleCoordinator coordinator = @@ -225,6 +234,9 @@ IGlobalRecoveryManager globalRecoveryManager = Mockito.mock(IGlobalRecoveryManager.class); Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true); Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager); + + BulkTxnIdFactory bulkTxnIdFactory = new BulkTxnIdFactory(); + Mockito.when(ccApplicationContext.getTxnIdFactory()).thenReturn(bulkTxnIdFactory); return ccApplicationContext; } @@ -238,4 +250,12 @@ Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap); return metadataProperties; } + + private NcLocalCounters mockLocalCounters() { + final NcLocalCounters localCounters = Mockito.mock(NcLocalCounters.class); + Mockito.when(localCounters.getMaxJobId()).thenReturn(1000L); + Mockito.when(localCounters.getMaxResourceId()).thenReturn(1000L); + Mockito.when(localCounters.getMaxTxnId()).thenReturn(1000L); + return localCounters; + } } \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index dda9ffd..5e99eec 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.config.IOption; @@ -59,9 +60,10 @@ * * @param nodeId * @param active + * @param ncLocalCounters * @throws HyracksDataException */ - void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException; + void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException; /** * Updates the active node and active state of the cluster partition with id {@code partitionNum} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java index ce49ccf..d36d383 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java @@ -18,14 +18,12 @@ */ package org.apache.asterix.common.transactions; -import org.apache.hyracks.api.exceptions.HyracksDataException; - public interface IResourceIdManager { long createResourceId(); boolean reported(String nodeId); - void report(String nodeId, long maxResourceId) throws HyracksDataException; + void report(String nodeId, long maxResourceId); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java index be4a1f8..5c28f3f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITxnIdFactory.java @@ -36,7 +36,7 @@ * @param id * the value to ensure future created transaction ids are larger than */ - void ensureMinimumId(long id) throws AlgebricksException; + void ensureMinimumId(long id); /** * The highest transaction id this factory has created diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java new file mode 100644 index 0000000..7cd61d8 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.utils; + +import java.io.Serializable; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.nc.NodeControllerService; + +public class NcLocalCounters implements Serializable { + + private final long maxResourceId; + private final long maxTxnId; + private final long maxJobId; + + private NcLocalCounters(long maxResourceId, long maxTxnId, long maxJobId) { + this.maxResourceId = maxResourceId; + this.maxTxnId = maxTxnId; + this.maxJobId = maxJobId; + } + + public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException { + final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); + long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), + MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); + long maxTxnId = appContext.getMaxTxnId(); + long maxJobId = ncs.getMaxJobId(ccId); + return new NcLocalCounters(maxResourceId, maxTxnId, maxJobId); + } + + public long getMaxResourceId() { + return maxResourceId; + } + + public long getMaxTxnId() { + return maxTxnId; + } + + public long getMaxJobId() { + return maxJobId; + } +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java index 6b3b6a0..f530afe 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/CachingTxnIdFactory.java @@ -62,7 +62,7 @@ } @Override - public void ensureMinimumId(long id) throws AlgebricksException { + public void ensureMinimumId(long id) { throw new UnsupportedOperationException(); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java deleted file mode 100644 index fe9a5b8..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersMessage.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.messaging.api.ICcAddressedMessage; -import org.apache.asterix.common.messaging.api.INCMessageBroker; -import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; -import org.apache.asterix.common.transactions.IResourceIdManager; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.control.CcId; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.nc.NodeControllerService; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class ReportLocalCountersMessage implements ICcAddressedMessage { - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = LogManager.getLogger(); - private final long maxResourceId; - private final long maxTxnId; - private final long maxJobId; - private final String src; - - public ReportLocalCountersMessage(String src, long maxResourceId, long maxTxnId, long maxJobId) { - this.src = src; - this.maxResourceId = maxResourceId; - this.maxTxnId = maxTxnId; - this.maxJobId = maxJobId; - } - - @Override - public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - try { - appCtx.getTxnIdFactory().ensureMinimumId(maxTxnId); - } catch (AlgebricksException e) { - throw HyracksDataException.create(e); - } - resourceIdManager.report(src, maxResourceId); - ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory() - .setMaxJobId(maxJobId); - } - - public static void send(CcId ccId, NodeControllerService ncs) throws HyracksDataException { - INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); - long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), - MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); - long maxTxnId = appContext.getMaxTxnId(); - long maxJobId = ncs.getMaxJobId(ccId); - ReportLocalCountersMessage countersMessage = - new ReportLocalCountersMessage(ncs.getId(), maxResourceId, maxTxnId, maxJobId); - try { - ((INCMessageBroker) ncs.getContext().getMessageBroker()).sendMessageToCC(ccId, countersMessage); - } catch (Exception e) { - LOGGER.log(Level.ERROR, "Unable to report local counters", e); - throw HyracksDataException.create(e); - } - } - - @Override - public String toString() { - return ReportLocalCountersMessage.class.getSimpleName(); - } -} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java deleted file mode 100644 index 51f53e7..0000000 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportLocalCountersRequestMessage.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.runtime.message; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.messaging.CcIdentifiedMessage; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.nc.NodeControllerService; - -public class ReportLocalCountersRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage { - private static final long serialVersionUID = 1L; - - @Override - public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ReportLocalCountersMessage.send(getCcId(), - (NodeControllerService) appCtx.getServiceContext().getControllerService()); - } - - @Override - public String toString() { - return ReportLocalCountersRequestMessage.class.getSimpleName(); - } -} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index 087913f..c767c52 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -49,23 +49,11 @@ new Exception("Cannot generate global resource id when cluster is not active.")); } else { response.setException(new Exception("One or more nodes has not reported max resource id.")); - requestMaxResourceID(clusterStateManager, resourceIdManager, broker); } } broker.sendApplicationMessageToNC(response, src); } catch (Exception e) { throw HyracksDataException.create(e); - } - } - - private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager, - ICCMessageBroker broker) throws Exception { - Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(true); - ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage(); - for (String nodeId : getParticipantNodes) { - if (!resourceIdManager.reported(nodeId)) { - broker.sendApplicationMessageToNC(msg, nodeId); - } } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java index 6d3077e..279c459 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.util.annotations.NotThreadSafe; public class ResourceIdManager implements IResourceIdManager { @@ -48,11 +49,8 @@ } @Override - public void report(String nodeId, long maxResourceId) throws HyracksDataException { + public void report(String nodeId, long maxResourceId) { globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev)); - if (reportedNodes.add(nodeId)) { - csm.refreshState(); - } + reportedNodes.add(nodeId); } - } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java index 542bc17..296ce9c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/BulkTxnIdFactory.java @@ -24,7 +24,7 @@ import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.common.transactions.TxnId; -class BulkTxnIdFactory implements ITxnIdFactory { +public class BulkTxnIdFactory implements ITxnIdFactory { private final AtomicLong maxId = new AtomicLong(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 8539fa4..8d3187b 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -37,12 +37,14 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.Section; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.controllers.NCConfig; @@ -133,8 +135,9 @@ } @Override - public synchronized void updateNodePartitions(String nodeId, boolean active) { + public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) { if (active) { + updateClusterCounters(nodeId, localCounters); participantNodes.add(nodeId); } else { participantNodes.remove(nodeId); @@ -181,15 +184,6 @@ if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) { setState(ClusterState.UNUSABLE); return; - } - - IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - for (String node : participantNodes) { - if (!resourceIdManager.reported(node)) { - LOGGER.info("Partitions are ready but {} has not yet registered its max resource id...", node); - setState(ClusterState.UNUSABLE); - return; - } } // the metadata bootstrap & global recovery must be complete before the cluster can be active if (metadataNodeActive) { @@ -452,6 +446,14 @@ return metadataPartition; } + private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) { + final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); + resourceIdManager.report(nodeId, localCounters.getMaxResourceId()); + appCtx.getTxnIdFactory().ensureMinimumId(localCounters.getMaxTxnId()); + ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobIdFactory() + .setMaxJobId(localCounters.getMaxJobId()); + } + private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) { ConfigManager configManager = ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager(); -- To view, visit https://asterix-gerrit.ics.uci.edu/2767 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ief8b9d43783ea22810c6fdb29947a1284e32daee Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]>
