Michael Blow has submitted this change and it was merged. Change subject: Wait For Metadata Registration Before Active, Etc. ......................................................................
Wait For Metadata Registration Before Active, Etc. Also: - metadata.port moved to [app]; honored - += metadata.node to [app] to optionally specify metadata node - += metadata.callback.port to [app] - Decrease timeout for metadata registration from 7 days to default of one minute, configurable with property - Log swallowed exception in SynchronizableWork - Add missing properties (metadata) to cluster state http api - Make AsterixPropertiesAccessor, to ensure consistent values when accessed in virtual cluster Change-Id: I48d7c10b3e43181ec307f7d890ba721f61bc2ab0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1247 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataBootstrap.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java M asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java M hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java 44 files changed, 456 insertions(+), 253 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index 392bec8..b18b669 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -54,7 +54,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { NodeControllerService ncs = (NodeControllerService) cs; IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index fc67d3c..e4a57e6 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -65,7 +65,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { ActiveLifecycleListener.INSTANCE.receive(this); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index dc0087b..7750ab0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -64,7 +64,7 @@ public void init(boolean deleteOldInstanceData) throws Exception { ncs = new NodeControllerService[0]; // ensure that ncs is not null - propertiesAccessor = new AsterixPropertiesAccessor(); + propertiesAccessor = AsterixPropertiesAccessor.getInstance(); if (deleteOldInstanceData) { deleteTransactionLogs(); removeTestStorageFiles(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java index ed081b5..c2c214c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java @@ -75,7 +75,6 @@ import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory; import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; -import org.apache.hyracks.api.application.IApplicationConfig; import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; @@ -134,26 +133,17 @@ private IReplicationManager replicationManager; private IRemoteRecoveryManager remoteRecoveryManager; private IReplicaResourcesManager replicaResourcesManager; - private final int metadataRmiPort; private final ILibraryManager libraryManager; private final NCExtensionManager ncExtensionManager; - public AsterixNCAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort, - List<AsterixExtension> extensions) throws AsterixException, InstantiationException, IllegalAccessException, + public AsterixNCAppRuntimeContext(INCApplicationContext ncApplicationContext, List<AsterixExtension> extensions) + throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { List<AsterixExtension> allExtensions = new ArrayList<>(); this.ncApplicationContext = ncApplicationContext; - // Determine whether to use old-style asterix-configuration.xml or new-style configuration. - // QQQ strip this out eventually - AsterixPropertiesAccessor propertiesAccessor; - IApplicationConfig cfg = ncApplicationContext.getAppConfig(); - // QQQ this is NOT a good way to determine whether the config is valid - if (cfg.getString("cc", "cluster.address") != null) { - propertiesAccessor = new AsterixPropertiesAccessor(cfg); - } else { - propertiesAccessor = new AsterixPropertiesAccessor(); - } + AsterixPropertiesAccessor propertiesAccessor = + AsterixPropertiesAccessor.getInstance(ncApplicationContext.getAppConfig()); compilerProperties = new AsterixCompilerProperties(propertiesAccessor); externalProperties = new AsterixExternalProperties(propertiesAccessor); metadataProperties = new AsterixMetadataProperties(propertiesAccessor); @@ -163,7 +153,6 @@ buildProperties = new AsterixBuildProperties(propertiesAccessor); replicationProperties = new AsterixReplicationProperties(propertiesAccessor); messagingProperties = new MessagingProperties(propertiesAccessor); - this.metadataRmiPort = metadataRmiPort; libraryManager = new ExternalLibraryManager(); if (extensions != null) { allExtensions.addAll(extensions); @@ -458,7 +447,7 @@ // This is a special case, we just give the metadataNode directly. // This way we can delay the registration of the metadataNode until // it is completely initialized. - MetadataManager.instantiate(new MetadataManager(proxy, MetadataNode.INSTANCE)); + MetadataManager.initialize(proxy, MetadataNode.INSTANCE); MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse); MetadataBootstrap.startDDLRecovery(); ncExtensionManager.initializeMetadata(); @@ -470,7 +459,8 @@ @Override public void exportMetadataNodeStub() throws RemoteException { - IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort); + IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, + getMetadataProperties().getMetadataPort()); ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index 9120aa5..764b559 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -71,6 +71,7 @@ import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.messages.IMessageBroker; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.CCConfig; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -88,7 +89,8 @@ @Override public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception { - IMessageBroker messageBroker = new CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService()); + final ClusterControllerService controllerService = (ClusterControllerService) ccAppCtx.getControllerService(); + IMessageBroker messageBroker = new CCMessageBroker(controllerService); this.appCtx = ccAppCtx; if (LOGGER.isLoggable(Level.INFO)) { @@ -101,20 +103,21 @@ AsterixResourceIdManager resourceIdManager = new AsterixResourceIdManager(); ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(), GlobalRecoveryManager.instance(), - libraryManager, resourceIdManager); + libraryManager, resourceIdManager, () -> MetadataManager.INSTANCE); ccExtensionManager = new CompilerExtensionManager(getExtensions()); AsterixAppContextInfo.INSTANCE.setExtensionManager(ccExtensionManager); - if (System.getProperty("java.rmi.server.hostname") == null) { - System.setProperty("java.rmi.server.hostname", - ((ClusterControllerService) ccAppCtx.getControllerService()).getCCConfig().clusterNetIpAddress); - } + final CCConfig ccConfig = controllerService.getCCConfig(); - setAsterixStateProxy(AsterixStateProxy.registerRemoteObject()); + if (System.getProperty("java.rmi.server.hostname") == null) { + System.setProperty("java.rmi.server.hostname", ccConfig.clusterNetIpAddress); + } + AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties(); + + setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort())); appCtx.setDistributedState(proxy); - AsterixMetadataProperties metadataProperties = AsterixAppContextInfo.INSTANCE.getMetadataProperties(); - MetadataManager.instantiate(new MetadataManager(proxy, metadataProperties)); + MetadataManager.initialize(proxy, metadataProperties); AsterixAppContextInfo.INSTANCE.getCCApplicationContext() .addJobLifecycleListener(ActiveLifecycleListener.INSTANCE); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 4eaab2d..75cbe44 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -42,6 +42,7 @@ import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse; import org.apache.asterix.runtime.util.ClusterStateManager; import org.apache.hyracks.api.application.IClusterLifecycleListener; +import org.apache.hyracks.api.exceptions.HyracksException; public class ClusterLifecycleListener implements IClusterLifecycleListener { @@ -64,13 +65,16 @@ } @Override - public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) { + public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NC: " + nodeId + " joined"); } ClusterStateManager.INSTANCE.addNCConfiguration(nodeId, ncConfiguration); + //if metadata node rejoining, we need to rebind the proxy connection when it is active again. - MetadataManager.INSTANCE.rebindMetadataNode = !ClusterStateManager.INSTANCE.isMetadataNodeActive(); + if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) { + MetadataManager.INSTANCE.rebindMetadataNode(); + } Set<String> nodeAddition = new HashSet<String>(); nodeAddition.add(nodeId); @@ -88,7 +92,7 @@ } @Override - public void notifyNodeFailure(Set<String> deadNodeIds) { + public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException { for (String deadNode : deadNodeIds) { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NC: " + deadNode + " left"); @@ -96,7 +100,9 @@ ClusterStateManager.INSTANCE.removeNCConfiguration(deadNode); //if metadata node failed, we need to rebind the proxy connection when it is active again - MetadataManager.INSTANCE.rebindMetadataNode = !ClusterStateManager.INSTANCE.isMetadataNodeActive(); + if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) { + MetadataManager.INSTANCE.rebindMetadataNode(); + } } updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds); Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java index f26afa8..e6f3142 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java @@ -61,10 +61,6 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint { private static final Logger LOGGER = Logger.getLogger(NCApplicationEntryPoint.class.getName()); - @Option(name = "-metadata-port", usage = "IP port to bind metadata listener (default: random port)", - required = false) - public int metadataRmiPort = 0; - @Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false) public boolean initialRun = false; @@ -94,7 +90,6 @@ parser.printUsage(System.err); throw e; } - ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getThreadFactory(), ncAppCtx.getLifeCycleComponentManager())); ncApplicationContext = ncAppCtx; @@ -103,11 +98,13 @@ LOGGER.info("Starting Asterix node controller: " + nodeId); } + final NodeControllerService controllerService = (NodeControllerService) ncAppCtx.getControllerService(); + if (System.getProperty("java.rmi.server.hostname") == null) { - System.setProperty("java.rmi.server.hostname", ((NodeControllerService) ncAppCtx.getControllerService()) + System.setProperty("java.rmi.server.hostname", (controllerService) .getConfiguration().clusterNetPublicIPAddress); } - runtimeContext = new AsterixNCAppRuntimeContext(ncApplicationContext, metadataRmiPort, getExtensions()); + runtimeContext = new AsterixNCAppRuntimeContext(ncApplicationContext, getExtensions()); AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext) .getMetadataProperties(); if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) { @@ -120,8 +117,7 @@ ncApplicationContext.setApplicationObject(runtimeContext); MessagingProperties messagingProperties = ((IAsterixPropertiesProvider) runtimeContext) .getMessagingProperties(); - messageBroker = new NCMessageBroker((NodeControllerService) ncAppCtx.getControllerService(), - messagingProperties); + messageBroker = new NCMessageBroker(controllerService, messagingProperties); ncApplicationContext.setMessageBroker(messageBroker); MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory( (NCMessageBroker) messageBroker, messagingProperties); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm index ea002ab..d076f74 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm @@ -7,20 +7,53 @@ "config": { "api.port": 19002, "cc.java.opts": "-Xmx1024m", + "cluster.partitions": { + "0": "ID:0, Original Node: asterix_nc1, IODevice: 0, Active Node: asterix_nc1", + "1": "ID:1, Original Node: asterix_nc1, IODevice: 1, Active Node: asterix_nc1", + "2": "ID:2, Original Node: asterix_nc2, IODevice: 0, Active Node: asterix_nc2", + "3": "ID:3, Original Node: asterix_nc2, IODevice: 1, Active Node: asterix_nc2" + }, "compiler.framesize": 32768, "compiler.groupmemory": 163840, "compiler.joinmemory": 163840, "compiler.pregelix.home": "~/pregelix", "compiler.sortmemory": 327680, + "core.dump.paths": {}, "feed.central.manager.port": 4500, "feed.max.threshold.period": 5, "feed.memory.available.wait.timeout": 10, "feed.memory.global.budget": 67108864, "feed.pending.work.threshold": 50, "feed.port": 19003, + "instance.name": null, "log.level": "INFO", "max.wait.active.cluster": 60, + "metadata.callback.port": 0, + "metadata.node": "asterix_nc1", + "metadata.partition": "ID:0, Original Node: asterix_nc1, IODevice: 0, Active Node: asterix_nc1", + "metadata.port": 0, + "metadata.registration.timeout.secs": 60, "nc.java.opts": "-Xmx1024m", + "node.partitions": { + "asterix_nc1": [ + "ID:0, Original Node: asterix_nc1, IODevice: 0, Active Node: asterix_nc1", + "ID:1, Original Node: asterix_nc1, IODevice: 1, Active Node: asterix_nc1" + ], + "asterix_nc2": [ + "ID:2, Original Node: asterix_nc2, IODevice: 0, Active Node: asterix_nc2", + "ID:3, Original Node: asterix_nc2, IODevice: 1, Active Node: asterix_nc2" + ] + }, + "node.stores": { + "asterix_nc1": [ + "iodevice0", + "iodevice1" + ], + "asterix_nc2": [ + "iodevice0", + "iodevice1" + ] + }, "plot.activate": false, "replication.enabled": false, "replication.factor": 2, @@ -38,6 +71,10 @@ "storage.memorycomponent.numpages": 8, "storage.memorycomponent.pagesize": 131072, "storage.metadata.memorycomponent.numpages": 256, + "transaction.log.dirs": { + "asterix_nc1": "target/txnLogDir/asterix_nc1", + "asterix_nc2": "target/txnLogDir/asterix_nc2" + }, "txn.commitprofiler.reportinterval": 5, "txn.job.recovery.memorysize": 67108864, "txn.lock.escalationthreshold": 1000, @@ -95,4 +132,4 @@ "shutdownUri": "http://127.0.0.1:19002/admin/shutdown", "state": "ACTIVE", "versionUri": "http://127.0.0.1:19002/admin/version" -} \ No newline at end of file +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java index adf8e38..323df65 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java @@ -27,6 +27,7 @@ public enum ClusterState { STARTING, + PENDING, ACTIVE, UNUSABLE, REBALANCING diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java index 6cd44a7..cc27fbb 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java @@ -57,17 +57,16 @@ @Override public ClusterPartition clone() { - ClusterPartition clone = new ClusterPartition(partitionId, nodeId, ioDeviceNum); - return clone; + return new ClusterPartition(partitionId, nodeId, ioDeviceNum); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("ID:" + partitionId); - sb.append(" Original Node: " + nodeId); - sb.append(" IODevice: " + ioDeviceNum); - sb.append(" Active Node: " + activeNodeId); + sb.append(", Original Node: " + nodeId); + sb.append(", IODevice: " + ioDeviceNum); + sb.append(", Active Node: " + activeNodeId); return sb.toString(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java index 677fc78..3584f2b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java @@ -26,22 +26,35 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties { + private static final String METADATA_REGISTRATION_TIMEOUT_KEY = "metadata.registration.timeout.secs"; + private static final long METADATA_REGISTRATION_TIMEOUT_DEFAULT = 60; + + private static final String METADATA_PORT_KEY = "metadata.port"; + private static final int METADATA_PORT_DEFAULT = 0; + + private static final String METADATA_CALLBACK_PORT_KEY = "metadata.callback.port"; + private static final int METADATA_CALLBACK_PORT_DEFAULT = 0; + public AsterixMetadataProperties(AsterixPropertiesAccessor accessor) { super(accessor); } + @PropertyKey("instance.name") public String getInstanceName() { return accessor.getInstanceName(); } + @PropertyKey("metadata.node") public String getMetadataNodeName() { return accessor.getMetadataNodeName(); } + @PropertyKey("metadata.partition") public ClusterPartition getMetadataPartition() { return accessor.getMetadataPartition(); } + @PropertyKey("node.stores") public Map<String, String[]> getStores() { return accessor.getStores(); } @@ -54,19 +67,41 @@ return accessor.getCoredumpPath(nodeId); } + @PropertyKey("core.dump.paths") public Map<String, String> getCoredumpPaths() { return accessor.getCoredumpConfig(); } + @PropertyKey("node.partitions") public Map<String, ClusterPartition[]> getNodePartitions() { return accessor.getNodePartitions(); } + @PropertyKey("cluster.partitions") public SortedMap<Integer, ClusterPartition> getClusterPartitions() { return accessor.getClusterPartitions(); } + @PropertyKey("transaction.log.dirs") public Map<String, String> getTransactionLogDirs() { return accessor.getTransactionLogDirs(); } + + @PropertyKey(METADATA_REGISTRATION_TIMEOUT_KEY) + public long getRegistrationTimeoutSecs() { + return accessor.getProperty(METADATA_REGISTRATION_TIMEOUT_KEY, METADATA_REGISTRATION_TIMEOUT_DEFAULT, + PropertyInterpreters.getLongPropertyInterpreter()); + } + + @PropertyKey(METADATA_PORT_KEY) + public int getMetadataPort() { + return accessor.getProperty(METADATA_PORT_KEY, METADATA_PORT_DEFAULT, + PropertyInterpreters.getIntegerPropertyInterpreter()); + } + + @PropertyKey(METADATA_CALLBACK_PORT_KEY) + public int getMetadataCallbackPort() { + return accessor.getProperty(METADATA_CALLBACK_PORT_KEY, METADATA_CALLBACK_PORT_DEFAULT, + PropertyInterpreters.getIntegerPropertyInterpreter()); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java index 1576774..3ae2bd9 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixProperties.java @@ -34,7 +34,7 @@ public static final String PROPERTY_CLUSTER_ADDRESS = "cluster.address"; public static final String PROPERTY_INSTANCE_NAME = "instance"; public static final String DEFAULT_INSTANCE_NAME = "DEFAULT_INSTANCE"; - public static final String PROPERTY_METADATA_PORT = "metadata.port"; + public static final String PROPERTY_METADATA_NODE = "metadata.node"; public static final String PROPERTY_COREDUMP_DIR = "coredumpdir"; public static final String DEFAULT_COREDUMP_DIR = String.join(File.separator, ASTERIXDB, "coredump"); public static final String PROPERTY_TXN_LOG_DIR = "txnlogdir"; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java index 61cb618..a12d802 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBException; @@ -54,6 +55,7 @@ public class AsterixPropertiesAccessor { private static final Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName()); + private static final AtomicReference<AsterixPropertiesAccessor> instanceHolder = new AtomicReference<>(); private final String instanceName; private final String metadataNodeName; private final List<String> nodeNames = new ArrayList<>();; @@ -76,7 +78,7 @@ * @throws AsterixException * @throws IOException */ - public AsterixPropertiesAccessor() throws AsterixException, IOException { + private AsterixPropertiesAccessor() throws AsterixException, IOException { String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY); if (fileName == null) { fileName = GlobalConfig.DEFAULT_CONFIG_FILE_NAME; @@ -164,25 +166,24 @@ /** * Constructor which wraps an IApplicationConfig. */ - public AsterixPropertiesAccessor(IApplicationConfig cfg) throws AsterixException { + private AsterixPropertiesAccessor(IApplicationConfig cfg) throws AsterixException { this.cfg = cfg; instanceName = cfg.getString(AsterixProperties.SECTION_ASTERIX, AsterixProperties.PROPERTY_INSTANCE_NAME, AsterixProperties.DEFAULT_INSTANCE_NAME); - String mdNode = null; nodePartitionsMap = new HashMap<>(); MutableInt uniquePartitionId = new MutableInt(0); extensions = new ArrayList<>(); // Iterate through each configured NC. for (String section : cfg.getSections()) { if (section.startsWith(AsterixProperties.SECTION_PREFIX_NC)) { - mdNode = configureNc(section, mdNode, uniquePartitionId); + configureNc(section, uniquePartitionId); } else if (section.startsWith(AsterixProperties.SECTION_PREFIX_EXTENSION)) { String className = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_EXTENSION, section); configureExtension(className, section); } } - - metadataNodeName = mdNode; + metadataNodeName = getProperty(AsterixProperties.PROPERTY_METADATA_NODE, + nodeNames.isEmpty() ? "" : nodeNames.get(0), PropertyInterpreters.getStringPropertyInterpreter()); asterixConfigurationParams = null; loadAsterixBuildProperties(); } @@ -197,16 +198,8 @@ extensions.add(new AsterixExtension(className, kvs)); } - private String configureNc(String section, String mdNode, MutableInt uniquePartitionId) { + private void configureNc(String section, MutableInt uniquePartitionId) { String ncId = AsterixProperties.getSectionId(AsterixProperties.SECTION_PREFIX_NC, section); - String newMetadataNode = mdNode; - - // Here we figure out which is the metadata node. If any NCs - // declare "metadata.port", use that one; otherwise just use the first. - if (mdNode == null || cfg.getString(section, AsterixProperties.PROPERTY_METADATA_PORT) != null) { - // QQQ But we don't actually *honor* metadata.port yet! - newMetadataNode = ncId; - } // Now we assign the coredump and txnlog directories for this node. // QQQ Default values? Should they be specified here? Or should there @@ -225,7 +218,7 @@ String[] nodeStores = new String[iodevices.length]; ClusterPartition[] nodePartitions = new ClusterPartition[iodevices.length]; for (int i = 0; i < nodePartitions.length; i++) { - // Construct final storage path from iodevice dir + storage subdir. + // Construct final storage path from iodevice dir + storage subdir.s nodeStores[i] = iodevices[i] + File.separator + storageSubdir; // Create ClusterPartition instances for this NC. ClusterPartition partition = new ClusterPartition(uniquePartitionId.getValue(), ncId, i); @@ -236,7 +229,6 @@ stores.put(ncId, nodeStores); nodePartitionsMap.put(ncId, nodePartitions); nodeNames.add(ncId); - return newMetadataNode; } private void loadAsterixBuildProperties() throws AsterixException { @@ -334,4 +326,24 @@ public List<AsterixExtension> getExtensions() { return extensions; } + + public static AsterixPropertiesAccessor getInstance(IApplicationConfig cfg) throws IOException, AsterixException { + // Determine whether to use old-style asterix-configuration.xml or new-style configuration. + // QQQ strip this out eventually + // QQQ this is NOT a good way to determine whether the config is valid + AsterixPropertiesAccessor propertiesAccessor; + if (cfg != null && cfg.getString("cc", "cluster.address") != null) { + propertiesAccessor = new AsterixPropertiesAccessor(cfg); + } else { + propertiesAccessor = new AsterixPropertiesAccessor(); + } + if (!instanceHolder.compareAndSet(null, propertiesAccessor)) { + propertiesAccessor = instanceHolder.get(); + } + return propertiesAccessor; + } + + public static AsterixPropertiesAccessor getInstance() throws IOException, AsterixException { + return getInstance(null); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java index dbd2139..6e8c4cf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java @@ -28,5 +28,5 @@ /** * handle the message upon delivery */ - public void handle(IControllerService cs) throws HyracksDataException, InterruptedException; + void handle(IControllerService cs) throws HyracksDataException, InterruptedException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataBootstrap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataBootstrap.java new file mode 100644 index 0000000..940ec60 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataBootstrap.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.metadata; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +@FunctionalInterface +public interface IMetadataBootstrap { + /** + * Initializes the metadata manager, e.g., finds the remote metadata node. + */ + void init() throws HyracksDataException; +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java index d1efe11..8a3392a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java @@ -18,26 +18,47 @@ */ package org.apache.asterix.metadata; +import java.util.logging.Level; +import java.util.logging.Logger; + /** * Periodically recycle temporary datasets. * * @author yingyib */ public class GarbageCollector implements Runnable { + private static final Logger LOGGER = Logger.getLogger(GarbageCollector.class.getName()); - private static long CLEANUP_PERIOD = 3600 * 24; + private static final long CLEANUP_PERIOD = 3600L * 24; - @Override - public void run() { - try { - synchronized (this) { - this.wait(CLEANUP_PERIOD); - } - MetadataManager.INSTANCE.cleanupTempDatasets(); - } catch (Exception e) { - // Prints the stack trace to log. - e.printStackTrace(); - } + static { + // Starts the garbage collector thread which + // should always be running. + Thread gcThread = new Thread(new GarbageCollector(), "Metadata GC"); + gcThread.setDaemon(true); + gcThread.start(); } + @Override + @SuppressWarnings("squid:S2142") // rethrow or interrupt thread on InterruptedException + public void run() { + LOGGER.info("Starting Metadata GC"); + while (true) { + try { + synchronized (this) { + this.wait(CLEANUP_PERIOD); + } + MetadataManager.INSTANCE.cleanupTempDatasets(); + } catch (InterruptedException e) { + break; + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Exception cleaning temp datasets", e); + } + } + LOGGER.info("Exiting Metadata GC"); + } + + public static void ensure() { + // no need to do anything, <clinit> does the work + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index 70da097..6a324a1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -22,6 +22,7 @@ import java.rmi.RemoteException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,6 +50,7 @@ import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.transaction.management.service.transaction.JobIdFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * Provides access to Asterix metadata via remote methods to the metadata node. @@ -83,71 +85,34 @@ * with transaction ids of regular jobs or other metadata transactions. */ public class MetadataManager implements IMetadataManager { - private static final int INITIAL_SLEEP_TIME = 64; - private static final int RETRY_MULTIPLIER = 5; - private static final int MAX_RETRY_COUNT = 10; - - // Set in init(). - public static MetadataManager INSTANCE; private final MetadataCache cache = new MetadataCache(); - private final IAsterixStateProxy proxy; - private IMetadataNode metadataNode; + protected final IAsterixStateProxy proxy; + protected IMetadataNode metadataNode; private final ReadWriteLock metadataLatch; - private final AsterixMetadataProperties metadataProperties; - public boolean rebindMetadataNode = false; + protected boolean rebindMetadataNode = false; - public MetadataManager(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) { - if (proxy == null) { - throw new Error("Null proxy given to MetadataManager."); + // TODO(mblow): replace references of this (non-constant) field with a method, update field name accordingly + public static IMetadataManager INSTANCE; + + private MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) { + this(proxy); + if (metadataNode == null) { + throw new IllegalArgumentException("Null metadataNode given to MetadataManager"); } - this.proxy = proxy; - this.metadataProperties = metadataProperties; - this.metadataNode = null; - this.metadataLatch = new ReentrantReadWriteLock(true); + this.metadataNode = metadataNode; } - public MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) { - if (metadataNode == null) { - throw new Error("Null metadataNode given to MetadataManager."); + private MetadataManager(IAsterixStateProxy proxy) { + if (proxy == null) { + throw new IllegalArgumentException("Null proxy given to MetadataManager"); } this.proxy = proxy; - this.metadataProperties = null; - this.metadataNode = metadataNode; this.metadataLatch = new ReentrantReadWriteLock(true); } @Override - public void init() throws RemoteException, MetadataException { - // Could be synchronized on any object. Arbitrarily chose proxy. - synchronized (proxy) { - if (metadataNode != null && !rebindMetadataNode) { - return; - } - try { - int retry = 0; - int sleep = INITIAL_SLEEP_TIME; - while (retry++ < MAX_RETRY_COUNT) { - metadataNode = proxy.getMetadataNode(); - if (metadataNode != null) { - rebindMetadataNode = false; - break; - } - Thread.sleep(sleep); - sleep *= RETRY_MULTIPLIER; - } - } catch (InterruptedException e) { - throw new MetadataException(e); - } - if (metadataNode == null) { - throw new Error("Failed to get the MetadataNode.\n" + "The MetadataNode was configured to run on NC: " - + metadataProperties.getMetadataNodeName()); - } - } - - // Starts the garbage collector thread which - // should always be running. - Thread garbageCollectorThread = new Thread(new GarbageCollector()); - garbageCollectorThread.start(); + public void init() throws HyracksDataException { + GarbageCollector.ensure(); } @Override @@ -243,7 +208,7 @@ @Override public List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, String dataverseName) throws MetadataException { - List<Dataset> dataverseDatasets = new ArrayList<Dataset>(); + List<Dataset> dataverseDatasets = new ArrayList<>(); // add uncommitted temporary datasets for (Dataset dataset : ctx.getDataverseDatasets(dataverseName)) { if (dataset.getDatasetDetails().isTemp()) { @@ -339,7 +304,7 @@ @Override public List<Index> getDatasetIndexes(MetadataTransactionContext ctx, String dataverseName, String datasetName) throws MetadataException { - List<Index> datasetIndexes = new ArrayList<Index>(); + List<Index> datasetIndexes = new ArrayList<>(); Dataset dataset = findDataset(ctx, dataverseName, datasetName); if (dataset == null) { return datasetIndexes; @@ -373,7 +338,7 @@ public CompactionPolicy getCompactionPolicy(MetadataTransactionContext ctx, String dataverse, String policyName) throws MetadataException { - CompactionPolicy compactionPolicy = null; + CompactionPolicy compactionPolicy; try { compactionPolicy = metadataNode.getCompactionPolicy(ctx.getJobId(), dataverse, policyName); } catch (RemoteException e) { @@ -434,7 +399,7 @@ ARecordType aRecType = (ARecordType) datatype.getDatatype(); return new Datatype( datatype.getDataverseName(), datatype.getDatatypeName(), new ARecordType(aRecType.getTypeName(), - aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()), + aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()), datatype.getIsAnonymous()); } try { @@ -710,7 +675,7 @@ @Override public DatasourceAdapter getAdapter(MetadataTransactionContext ctx, String dataverseName, String name) throws MetadataException { - DatasourceAdapter adapter = null; + DatasourceAdapter adapter; try { adapter = metadataNode.getAdapter(ctx.getJobId(), dataverseName, name); } catch (RemoteException e) { @@ -733,7 +698,7 @@ @Override public List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String dataverseName) throws MetadataException { - List<Library> dataverseLibaries = null; + List<Library> dataverseLibaries; try { // Assuming that the transaction can read its own writes on the // metadata node. @@ -759,7 +724,7 @@ @Override public Library getLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName) throws MetadataException, RemoteException { - Library library = null; + Library library; try { library = metadataNode.getLibrary(ctx.getJobId(), dataverseName, libraryName); } catch (RemoteException e) { @@ -792,18 +757,18 @@ public FeedPolicyEntity getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName) throws MetadataException { - FeedPolicyEntity FeedPolicy = null; + FeedPolicyEntity feedPolicy; try { - FeedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName); + feedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName); } catch (RemoteException e) { throw new MetadataException(e); } - return FeedPolicy; + return feedPolicy; } @Override public Feed getFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException { - Feed feed = null; + Feed feed; try { feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName); } catch (RemoteException e) { @@ -814,7 +779,7 @@ @Override public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException { - Feed feed = null; + Feed feed; try { feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName); metadataNode.dropFeed(ctx.getJobId(), dataverse, feedName); @@ -834,6 +799,7 @@ ctx.addFeed(feed); } + @Override public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse) throws MetadataException { List<DatasourceAdapter> dataverseAdapters; @@ -845,9 +811,10 @@ return dataverseAdapters; } + @Override public void dropFeedPolicy(MetadataTransactionContext mdTxnCtx, String dataverseName, String policyName) throws MetadataException { - FeedPolicyEntity feedPolicy = null; + FeedPolicyEntity feedPolicy; try { feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName); metadataNode.dropFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName); @@ -901,7 +868,7 @@ @Override public ExternalFile getExternalFile(MetadataTransactionContext ctx, String dataverseName, String datasetName, - Integer fileNumber) throws MetadataException { + Integer fileNumber) throws MetadataException { ExternalFile file; try { file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber); @@ -939,7 +906,7 @@ cache.cleanupTempDatasets(); } - private Dataset findDataset(MetadataTransactionContext ctx, String dataverseName, String datasetName) { + public Dataset findDataset(MetadataTransactionContext ctx, String dataverseName, String datasetName) { Dataset dataset = ctx.getDataset(dataverseName, datasetName); if (dataset == null) { dataset = cache.getDataset(dataverseName, datasetName); @@ -969,7 +936,8 @@ @Override public <T extends IExtensionMetadataEntity> List<T> getEntities(MetadataTransactionContext mdTxnCtx, - IExtensionMetadataSearchKey searchKey) throws MetadataException { + IExtensionMetadataSearchKey searchKey) + throws MetadataException { try { return metadataNode.getEntities(mdTxnCtx.getJobId(), searchKey); } catch (RemoteException e) { @@ -977,7 +945,49 @@ } } - public static synchronized void instantiate(MetadataManager metadataManager) { - MetadataManager.INSTANCE = metadataManager; + @Override + public void rebindMetadataNode() { + rebindMetadataNode = true; + } + + public static void initialize(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) { + INSTANCE = new CCMetadataManagerImpl(proxy, metadataProperties); + } + + public static void initialize(IAsterixStateProxy proxy, MetadataNode metadataNode) { + INSTANCE = new MetadataManager(proxy, metadataNode); + } + + private static class CCMetadataManagerImpl extends MetadataManager { + private final AsterixMetadataProperties metadataProperties; + + public CCMetadataManagerImpl(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) { + super(proxy); + this.metadataProperties = metadataProperties; + } + + @Override + public synchronized void init() throws HyracksDataException { + if (metadataNode != null && !rebindMetadataNode) { + return; + } + try { + metadataNode = proxy.waitForMetadataNode(metadataProperties.getRegistrationTimeoutSecs(), + TimeUnit.SECONDS); + if (metadataNode != null) { + rebindMetadataNode = false; + } else { + throw new HyracksDataException("The MetadataNode failed to bind before the configured timeout (" + + metadataProperties.getRegistrationTimeoutSecs() + " seconds); the MetadataNode was " + + "configured to run on NC: " + metadataProperties.getMetadataNodeName()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new HyracksDataException(e); + } catch (RemoteException e) { + throw new HyracksDataException(e); + } + super.init(); + } } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java index 7717a79..c94e159 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IAsterixStateProxy.java @@ -22,12 +22,13 @@ import java.io.Serializable; import java.rmi.Remote; import java.rmi.RemoteException; +import java.util.concurrent.TimeUnit; /** * Interface for setting/getting distributed state of Asterix. */ public interface IAsterixStateProxy extends Remote, Serializable { - public void setMetadataNode(IMetadataNode metadataNode) throws RemoteException; + void setMetadataNode(IMetadataNode metadataNode) throws RemoteException; - public IMetadataNode getMetadataNode() throws RemoteException; + IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws RemoteException, InterruptedException; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java index 0acc027..feb4db0 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.functions.FunctionSignature; +import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.metadata.MetadataException; import org.apache.asterix.metadata.MetadataTransactionContext; @@ -39,7 +40,6 @@ import org.apache.asterix.metadata.entities.Library; import org.apache.asterix.metadata.entities.Node; import org.apache.asterix.metadata.entities.NodeGroup; -import org.apache.hyracks.api.exceptions.HyracksDataException; /** * A metadata manager provides user access to Asterix metadata (e.g., types, @@ -53,16 +53,7 @@ * finer levels is the responsibility of the metadata node, not the metadata * manager or its user. */ -public interface IMetadataManager { - - /** - * Initializes the metadata manager, e.g., finds the remote metadata node. - * - * @throws RemoteException - * If an error occurred while contacting the proxy for finding - * the metadata node. - */ - void init() throws RemoteException, MetadataException; +public interface IMetadataManager extends IMetadataBootstrap { /** * Begins a transaction on the metadata node. @@ -256,7 +247,8 @@ * Name of the datavers holding the given dataset. * @param datasetName * Name of the dataset holding the index. - * @indexName Name of the index to retrieve. + * @param indexName + * Name of the index to retrieve. * @return An Index instance. * @throws MetadataException * For example, if the index does not exist. @@ -273,7 +265,8 @@ * Name of the datavers holding the given dataset. * @param datasetName * Name of the dataset holding the index. - * @indexName Name of the index to retrieve. + * @param indexName + * Name of the index to retrieve. * @throws MetadataException * For example, if the index does not exist. */ @@ -406,7 +399,7 @@ /** * @param mdTxnCtx * MetadataTransactionContext of an active metadata transaction. - * @param function + * @param adapter * An instance of type Adapter that represents the adapter being * added * @throws MetadataException @@ -418,7 +411,7 @@ * MetadataTransactionContext of an active metadata transaction. * @param dataverseName * the dataverse associated with the adapter being searched - * @param Name + * @param name * name of the adapter * @return * @throws MetadataException @@ -436,6 +429,18 @@ * @throws MetadataException */ void dropAdapter(MetadataTransactionContext ctx, String dataverseName, String name) throws MetadataException; + + /** + * + * @param ctx + * MetadataTransactionContext of an active metadata transaction. + * @param dataverseName + * the dataverse whose associated adapters are being requested + * @return + * @throws MetadataException + */ + List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext ctx, String dataverseName) + throws MetadataException; /** * @param ctx @@ -497,6 +502,14 @@ * @param ctx * @param dataverse * @param policyName + * @throws MetadataException + */ + void dropFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName) throws MetadataException; + + /** + * @param ctx + * @param dataverse + * @param policyName * @return * @throws MetadataException */ @@ -526,7 +539,7 @@ * @param libraryName * Name of library to be deleted. MetadataException for example, * if the library does not exists. - * @throws RemoteException + * @throws MetadataException */ void dropLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName) throws MetadataException; @@ -540,7 +553,6 @@ * Library to be added * @throws MetadataException * for example, if the library is already added. - * @throws RemoteException */ void addLibrary(MetadataTransactionContext ctx, Library library) throws MetadataException; @@ -567,7 +579,6 @@ * dataverse asociated with the library that is to be retrieved. * @return Library * @throws MetadataException - * @throws RemoteException */ List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String dataverseName) throws MetadataException; @@ -671,9 +682,13 @@ * @param searchKey * @return * @throws MetadataException - * @throws HyracksDataException */ <T extends IExtensionMetadataEntity> List<T> getEntities(MetadataTransactionContext mdTxnCtx, - IExtensionMetadataSearchKey searchKey) throws MetadataException, HyracksDataException; + IExtensionMetadataSearchKey searchKey) throws MetadataException; + /** + * Indicate when the metadata node has left or rejoined the cluster, and the MetadataManager should + * rebind it + */ + void rebindMetadataNode(); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java index 2f881be..da6bb54 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java @@ -21,6 +21,7 @@ import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import org.apache.asterix.metadata.api.IAsterixStateProxy; @@ -36,8 +37,8 @@ private IMetadataNode metadataNode; private static final IAsterixStateProxy cc = new AsterixStateProxy(); - public static IAsterixStateProxy registerRemoteObject() throws RemoteException { - IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, 0); + public static IAsterixStateProxy registerRemoteObject(int metadataCallbackPort) throws RemoteException { + IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort); LOGGER.info("Asterix Distributed State Proxy Bound"); return stub; } @@ -48,12 +49,21 @@ } @Override - public void setMetadataNode(IMetadataNode metadataNode) throws RemoteException { + public synchronized void setMetadataNode(IMetadataNode metadataNode) { this.metadataNode = metadataNode; + notifyAll(); } @Override - public IMetadataNode getMetadataNode() throws RemoteException { - return this.metadataNode; + public IMetadataNode waitForMetadataNode(long waitFor, TimeUnit timeUnit) throws InterruptedException { + synchronized (this) { + long timeToWait = TimeUnit.MILLISECONDS.convert(waitFor, timeUnit); + while (metadataNode == null && timeToWait > 0) { + long startTime = System.currentTimeMillis(); + wait(timeToWait); + timeToWait -= System.currentTimeMillis() - startTime; + } + return metadataNode; + } } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java index 5cb1a6a..dba3bc7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackRequestMessage.java @@ -63,7 +63,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { NodeControllerService ncs = (NodeControllerService) cs; IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java index 4ae73ea..cb56c39 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/CompleteFailbackResponseMessage.java @@ -48,7 +48,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { ClusterStateManager.INSTANCE.processCompleteFailbackResponse(this); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java index c112366..7283b89 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackRequestMessage.java @@ -72,7 +72,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { NodeControllerService ncs = (NodeControllerService) cs; IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java index db89f7c..d87cd23 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/PreparePartitionsFailbackResponseMessage.java @@ -39,7 +39,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { ClusterStateManager.INSTANCE.processPreparePartitionsFailbackResponse(this); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java index fc55968..7776543 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReplicaEventMessage.java @@ -54,7 +54,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { NodeControllerService ncs = (NodeControllerService) cs; IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java index f9f6233..c1319e0 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java @@ -48,7 +48,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { IAsterixResourceIdManager resourceIdManager = AsterixAppContextInfo.INSTANCE.getResourceIdManager(); resourceIdManager.report(src, maxResourceId); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java index 203104e..a1290df 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdRequestMessage.java @@ -27,7 +27,7 @@ private static final long serialVersionUID = 1L; @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { ReportMaxResourceIdMessage.send((NodeControllerService) cs); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index afe2427..a2c8d74 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -38,7 +38,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { try { ICCMessageBroker broker = (ICCMessageBroker) AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java index e877f52..7264c88 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeRequestMessage.java @@ -35,7 +35,7 @@ private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName()); @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { NodeControllerService ncs = (NodeControllerService) cs; IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java index 2466e2b..d3c3502 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverMetadataNodeResponseMessage.java @@ -37,7 +37,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { ClusterStateManager.INSTANCE.processMetadataNodeTakeoverResponse(this); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java index e024eed..e78f159 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsRequestMessage.java @@ -74,7 +74,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { NodeControllerService ncs = (NodeControllerService) cs; IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java index a4a5226..3adc8e9 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TakeoverPartitionsResponseMessage.java @@ -49,7 +49,7 @@ } @Override - public void handle(IControllerService cs) throws HyracksDataException { + public void handle(IControllerService cs) throws HyracksDataException, InterruptedException { ClusterStateManager.INSTANCE.processPartitionTakeoverResponse(this); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java index 471d3d3..592f9df 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/AsterixAppContextInfo.java @@ -19,6 +19,7 @@ package org.apache.asterix.runtime.util; import java.io.IOException; +import java.util.function.Supplier; import java.util.logging.Logger; import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger; @@ -37,8 +38,8 @@ import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.transactions.IAsterixResourceIdManager; -import org.apache.hyracks.api.application.IApplicationConfig; import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; @@ -66,6 +67,7 @@ private AsterixReplicationProperties replicationProperties; private AsterixExtensionProperties extensionProperties; private MessagingProperties messagingProperties; + private Supplier<IMetadataBootstrap> metadataBootstrapSupplier; private IHyracksClientConnection hcc; private Object extensionManager; private volatile boolean initialized = false; @@ -74,8 +76,10 @@ } public static synchronized void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc, - IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager, - IAsterixResourceIdManager resourceIdManager) + IGlobalRecoveryMaanger globalRecoveryMaanger, + ILibraryManager libraryManager, + IAsterixResourceIdManager resourceIdManager, + Supplier<IMetadataBootstrap> metadataBootstrapSupplier) throws AsterixException, IOException { if (INSTANCE.initialized) { throw new AsterixException(AsterixAppContextInfo.class.getSimpleName() + " has been initialized already"); @@ -88,14 +92,7 @@ INSTANCE.resourceIdManager = resourceIdManager; // Determine whether to use old-style asterix-configuration.xml or new-style configuration. // QQQ strip this out eventually - AsterixPropertiesAccessor propertiesAccessor; - IApplicationConfig cfg = ccAppCtx.getAppConfig(); - // QQQ this is NOT a good way to determine whether the config is valid - if (cfg.getString("cc", "cluster.address") != null) { - propertiesAccessor = new AsterixPropertiesAccessor(cfg); - } else { - propertiesAccessor = new AsterixPropertiesAccessor(); - } + AsterixPropertiesAccessor propertiesAccessor = AsterixPropertiesAccessor.getInstance(ccAppCtx.getAppConfig()); INSTANCE.compilerProperties = new AsterixCompilerProperties(propertiesAccessor); INSTANCE.externalProperties = new AsterixExternalProperties(propertiesAccessor); INSTANCE.metadataProperties = new AsterixMetadataProperties(propertiesAccessor); @@ -107,6 +104,8 @@ INSTANCE.hcc = hcc; INSTANCE.buildProperties = new AsterixBuildProperties(propertiesAccessor); INSTANCE.messagingProperties = new MessagingProperties(propertiesAccessor); + INSTANCE.metadataBootstrapSupplier = metadataBootstrapSupplier; + Logger.getLogger("org.apache.asterix").setLevel(INSTANCE.externalProperties.getLogLevel()); Logger.getLogger("org.apache.hyracks").setLevel(INSTANCE.externalProperties.getLogLevel()); } @@ -204,4 +203,8 @@ public IAsterixResourceIdManager getResourceIdManager() { return resourceIdManager; } + + public IMetadataBootstrap getMetadataBootstrap() { + return metadataBootstrapSupplier.get(); + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java index bc15788..942abe3 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java @@ -51,6 +51,8 @@ import org.apache.asterix.runtime.message.TakeoverPartitionsResponseMessage; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; import org.json.JSONException; import org.json.JSONObject; @@ -109,7 +111,7 @@ } } - public synchronized void removeNCConfiguration(String nodeId) { + public synchronized void removeNCConfiguration(String nodeId) throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Removing configuration parameters for node id " + nodeId); } @@ -139,7 +141,8 @@ } } - public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) { + public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) + throws HyracksException { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Registering configuration parameters for node id " + nodeId); } @@ -167,7 +170,7 @@ updateNodePartitions(nodeId, true); } - private synchronized void updateNodePartitions(String nodeId, boolean added) { + private synchronized void updateNodePartitions(String nodeId, boolean added) throws HyracksDataException { ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId); // if this isn't a storage node, it will not have cluster partitions if (nodePartitions != null) { @@ -183,7 +186,7 @@ } } - private synchronized void updateClusterState() { + private synchronized void updateClusterState() throws HyracksDataException { for (ClusterPartition p : clusterPartitions.values()) { if (!p.isActive()) { state = ClusterState.UNUSABLE; @@ -191,11 +194,14 @@ return; } } - //if all storage partitions are active as well as the metadata node, then the cluster is active + // if all storage partitions are active as well as the metadata node, then the cluster is active if (metadataNodeActive) { + state = ClusterState.PENDING; + LOGGER.info("Cluster is now " + state); + AsterixAppContextInfo.INSTANCE.getMetadataBootstrap().init(); state = ClusterState.ACTIVE; - LOGGER.info("Cluster is now ACTIVE"); - //start global recovery + LOGGER.info("Cluster is now " + state); + // start global recovery AsterixAppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery(); if (autoFailover && !pendingProcessingFailbackPlans.isEmpty()) { processPendingFailbackPlans(); @@ -412,19 +418,21 @@ } } - public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) { - for (Integer partitonId : reponse.getPartitions()) { + public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage response) + throws HyracksDataException { + for (Integer partitonId : response.getPartitions()) { ClusterPartition partition = clusterPartitions.get(partitonId); partition.setActive(true); - partition.setActiveNodeId(reponse.getNodeId()); + partition.setActiveNodeId(response.getNodeId()); } - pendingTakeoverRequests.remove(reponse.getRequestId()); + pendingTakeoverRequests.remove(response.getRequestId()); resetClusterPartitionConstraint(); updateClusterState(); } - public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) { - currentMetadataNode = reponse.getNodeId(); + public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage response) + throws HyracksDataException { + currentMetadataNode = response.getNodeId(); metadataNodeActive = true; LOGGER.info("Current metadata node: " + currentMetadataNode); updateClusterState(); @@ -556,7 +564,8 @@ } } - public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage reponse) { + public synchronized void processCompleteFailbackResponse(CompleteFailbackResponseMessage response) + throws HyracksDataException { /** * the failback plan completed successfully: * Remove all references to it. @@ -564,7 +573,7 @@ * Notify its replicas to reconnect to it. * Set the failing back node partitions as active. */ - NodeFailbackPlan plan = planId2FailbackPlanMap.remove(reponse.getPlanId()); + NodeFailbackPlan plan = planId2FailbackPlanMap.remove(response.getPlanId()); String nodeId = plan.getNodeId(); failedNodes.remove(nodeId); //notify impacted replicas they can reconnect to this node diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java index e3a561d..77effb6 100644 --- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java +++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java @@ -155,7 +155,7 @@ } } - @Parameters + @Parameters(name = "NCServiceExecutionTest {index}: {0}") public static Collection<Object[]> tests() throws Exception { Collection<Object[]> testArgs = new ArrayList<Object[]>(); TestCaseContext.Builder b = new TestCaseContext.Builder(); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java index 733382b..a9bef18 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java @@ -21,8 +21,11 @@ import java.util.Map; import java.util.Set; +import org.apache.hyracks.api.exceptions.HyracksException; + /** - * A listener interface for providing notification call backs to events such as a Node Controller joining/leaving the cluster. + * A listener interface for providing notification call backs to events such as a Node Controller joining/leaving the + * cluster. */ public interface IClusterLifecycleListener { @@ -35,15 +38,15 @@ /** * @param nodeId * A unique identifier of a Node Controller - * @param ncConfig + * @param ncConfiguration * A map containing the set of configuration parameters that were used to start the Node Controller */ - public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration); + public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException; /** * @param deadNodeIds * A set of Node Controller Ids that have left the cluster. The set is not cumulative. */ - public void notifyNodeFailure(Set<String> deadNodeIds); + public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java index 786a89f..dff3107 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java @@ -23,7 +23,7 @@ import org.apache.hyracks.control.common.controllers.CCConfig; public class CCDriver { - public static void main(String args[]) throws Exception { + public static void main(String args []) throws Exception { try { CCConfig ccConfig = new CCConfig(); CmdLineParser cp = new CmdLineParser(ccConfig); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java index dfce7b8..dd6f83b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java @@ -110,7 +110,7 @@ } } - public void notifyNodeFailure(Set<String> deadNodeIds) { + public void notifyNodeFailure(Set<String> deadNodeIds) throws HyracksException { for (IClusterLifecycleListener l : clusterLifecycleListeners) { l.notifyNodeFailure(deadNodeIds); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java index b3a3065..510c729 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java @@ -24,6 +24,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; @@ -41,7 +42,7 @@ @Override public void run() { - Set<String> deadNodes = new HashSet<String>(); + final Set<String> deadNodes = new HashSet<String>(); Map<String, NodeControllerState> nodeMap = ccs.getNodeMap(); for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) { NodeControllerState state = e.getValue(); @@ -69,8 +70,12 @@ } } } - if (deadNodes != null && deadNodes.size() > 0) { - ccs.getApplicationContext().notifyNodeFailure(deadNodes); + if (!deadNodes.isEmpty()) { + try { + ccs.getApplicationContext().notifyNodeFailure(deadNodes); + } catch (HyracksException e) { + LOGGER.log(Level.WARNING, "Uncaught exception on notifyNodeFailure", e); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java index 64bd7d1..98d6375 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java @@ -34,7 +34,9 @@ @Option(name = "-address", usage = "IP Address for CC (default: localhost)", required = false) public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress(); - @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients (default: same as -address)", required = false) + @Option(name = "-client-net-ip-address", + usage = "Sets the IP Address to listen for connections from clients (default: same as -address)", + required = false) public String clientNetIpAddress; @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)") @@ -43,46 +45,60 @@ // QQQ Note that clusterNetIpAddress is *not directly used* yet. Both // the cluster listener and the web server listen on "all interfaces". // This IP address is only used to instruct the NC on which IP to call in. - @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)", required = false) + @Option(name = "-cluster-net-ip-address", + usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)", + required = false) public String clusterNetIpAddress; - @Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)") + @Option(name = "-cluster-net-port", + usage = "Sets the port to listen for connections from node controllers (default 1099)") public int clusterNetPort = 1099; @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 16001)") public int httpPort = 16001; - @Option(name = "-heartbeat-period", usage = "Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000)") + @Option(name = "-heartbeat-period", + usage = "Sets the time duration between two heartbeats from each node controller in milliseconds" + + " (default: 10000)") public int heartbeatPeriod = 10000; - @Option(name = "-max-heartbeat-lapse-periods", usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)") + @Option(name = "-max-heartbeat-lapse-periods", + usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)") public int maxHeartbeatLapsePeriods = 5; - @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0)") + @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node " + + "controller in milliseconds. 0 to disable. (default: 0)") public int profileDumpPeriod = 0; - @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not specified in the job specification. (default: 5)") + @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not " + + "specified in the job specification. (default: 5)") public int defaultMaxJobAttempts = 5; - @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to the specified value. (default: 10)") + @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to " + + "the specified value. (default: 10)") public int jobHistorySize = 10; - @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours)") + @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " + + "be retained by the system in milliseconds. (default: 24 hours)") public long resultTTL = 86400000; - @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute)") + @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " + + "should be invoked in milliseconds. (default: 1 minute)") public long resultSweepThreshold = 60000; - @Option(name = "-cc-root", usage = "Sets the root folder used for file operations. (default: ClusterControllerService)") + @Option(name = "-cc-root", + usage = "Sets the root folder used for file operations. (default: ClusterControllerService)") public String ccRoot = "ClusterControllerService"; - @Option(name = "-cluster-topology", required = false, usage = "Sets the XML file that defines the cluster topology. (default: null)") + @Option(name = "-cluster-topology", required = false, + usage = "Sets the XML file that defines the cluster topology. (default: null)") public File clusterTopologyDefinition = null; @Option(name = "-app-cc-main-class", required = false, usage = "Application CC Main Class") public String appCCMainClass = null; - @Option(name = "-config-file", usage = "Specify path to master configuration file (default: none)", required = false) + @Option(name = "-config-file", + usage = "Specify path to master configuration file (default: none)", required = false) public String configFile = null; @Argument @@ -132,8 +148,8 @@ } // "address" is the default for all IP addresses - if (clusterNetIpAddress == null) clusterNetIpAddress = ipAddress; - if (clientNetIpAddress == null) clientNetIpAddress = ipAddress; + clusterNetIpAddress = clusterNetIpAddress == null ? ipAddress : clusterNetIpAddress; + clientNetIpAddress = clientNetIpAddress == null ? ipAddress : clientNetIpAddress; } /** diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java index c6c3e73..e999de4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java @@ -35,7 +35,11 @@ * the section "nc/red", but if it is not found, will look in the section "nc". */ public class IniUtils { - private static <T> T getIniValue(Ini ini, String section, String key, T default_value, Class<T> clazz) { + + private IniUtils() { + } + + private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) { T value; while (true) { value = ini.get(section, key, clazz); @@ -48,7 +52,7 @@ } break; } - return (value != null) ? value : default_value; + return (value != null) ? value : defaultValue; } @SuppressWarnings("unchecked") diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index ce5043a..2e47f41 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -288,11 +288,9 @@ configuration.put("messaging-port", String.valueOf(messagingPort)); configuration.put("messaging-public-ip-address", messagingPublicIPAddress); configuration.put("messaging-public-port", String.valueOf(messagingPublicPort)); + configuration.put("ncservice-pid", String.valueOf(ncservicePid)); if (appNCMainClass != null) { configuration.put("app-nc-main-class", appNCMainClass); - } - if (ncservicePid != 0) { - configuration.put("ncservice-pid", String.valueOf(ncservicePid)); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java index 02a789a..f9952db 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/work/SynchronizableWork.java @@ -18,6 +18,9 @@ */ package org.apache.hyracks.control.common.work; +import java.util.logging.Level; +import java.util.logging.Logger; + public abstract class SynchronizableWork extends AbstractWork { private boolean done; @@ -34,8 +37,9 @@ public final void run() { try { doRun(); - } catch (Exception e) { - this.e = e; + } catch (Exception ex) { + Logger.getLogger(getClass().getName()).log(Level.INFO, "Exception thrown from work", ex); + this.e = ex; } finally { synchronized (this) { done = true; @@ -46,11 +50,7 @@ public final synchronized void sync() throws Exception { while (!done) { - try { - wait(); - } catch (InterruptedException e) { - throw e; - } + wait(); } if (e != null) { throw e; diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java index 13607ad..8d1246b 100644 --- a/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java +++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java @@ -41,7 +41,7 @@ public class NCServiceIT { private static final String TARGET_DIR = StringUtils - .join(new String[] { System.getProperty("basedir"), "target" }, File.separator); + .join(new String[] { ".", "target" }, File.separator); private static final String LOG_DIR = StringUtils .join(new String[] { TARGET_DIR, "failsafe-reports" }, File.separator); private static final String RESOURCE_DIR = StringUtils -- To view, visit https://asterix-gerrit.ics.uci.edu/1247 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I48d7c10b3e43181ec307f7d890ba721f61bc2ab0 Gerrit-PatchSet: 8 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
