Michael Blow has submitted this change and it was merged. Change subject: [NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes ......................................................................
[NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes - Ensure thread factory is configured before using it - Don't mark cluster state ACTIVE until after global recovery has completed - Failure of global recovery causes CC to shutdown - Don't mark cluster state ACTIVE until max resource id has been reported by all nodes Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2004 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java 16 files changed, 143 insertions(+), 89 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; ; Verified Murtadha Hubail: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 37e0c58..ef3800c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -108,12 +108,19 @@ private IHyracksClientConnection hcc; @Override - public void start(IServiceContext serviceCtx, String[] args) throws Exception { + public void init(IServiceContext serviceCtx) throws Exception { + ccServiceCtx = (ICCServiceContext) serviceCtx; + ccServiceCtx.setThreadFactory( + new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); + } + + @Override + public void start(String[] args) throws Exception { if (args.length > 0) { throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args)); } - final ClusterControllerService controllerService = (ClusterControllerService) serviceCtx.getControllerService(); - this.ccServiceCtx = (ICCServiceContext) serviceCtx; + final ClusterControllerService controllerService = (ClusterControllerService) ccServiceCtx + .getControllerService(); ccServiceCtx.setMessageBroker(new CCMessageBroker(controllerService)); configureLoggingLevel(ccServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL)); @@ -122,8 +129,6 @@ LOGGER.info("Starting Asterix cluster controller"); } - ccServiceCtx.setThreadFactory( - new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress(); int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); hcc = new HyracksConnection(strIP, port); @@ -207,8 +212,8 @@ } protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception { - HttpServer jsonAPIServer = - new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort()); + HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), + externalProperties.getAPIServerPort()); jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx); jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java index 3209557..13f3afa 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java @@ -92,11 +92,8 @@ MetadataTransactionContext mdTxnCtx = null; try { MetadataManager.INSTANCE.init(); - // Loop over datasets mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) { - mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse); - } + mdTxnCtx = doRecovery(appCtx, mdTxnCtx); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { // This needs to be fixed <-- Needs to shutdown the system --> @@ -109,8 +106,8 @@ try { MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); } catch (Exception e1) { - LOGGER.log(Level.SEVERE, "Exception in aborting", e1); - e1.addSuppressed(e); + LOGGER.log(Level.SEVERE, "Exception aborting metadata transaction", e1); + e.addSuppressed(e1); throw new IllegalStateException(e); } } @@ -118,11 +115,21 @@ } recoveryCompleted = true; LOGGER.info("Global Recovery Completed"); + appCtx.getClusterStateManager().refreshState(); + } + + protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx) + throws Exception { + // Loop over datasets + for (Dataverse dataverse : MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) { + mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse); + } + return mdTxnCtx; } @Override public void notifyStateChange(ClusterState newState) { - if (newState != ClusterState.ACTIVE) { + if (newState != ClusterState.ACTIVE && newState != ClusterState.RECOVERING) { recoveryCompleted = false; } } @@ -132,8 +139,8 @@ if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) { MetadataProvider metadataProvider = new MetadataProvider(appCtx, dataverse); try { - List<Dataset> datasets = - MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName()); + List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, + dataverse.getDataverseName()); for (Dataset dataset : datasets) { if (dataset.getDatasetType() == DatasetType.EXTERNAL) { // External dataset @@ -145,8 +152,8 @@ TransactionState datasetState = dsd.getState(); if (!indexes.isEmpty()) { if (datasetState == TransactionState.BEGIN) { - List<ExternalFile> files = - MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); + List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, + dataset); // if persumed abort, roll backward // 1. delete all pending files for (ExternalFile file : files) { @@ -157,8 +164,8 @@ } // 2. clean artifacts in NCs metadataProvider.setMetadataTxnContext(mdTxnCtx); - JobSpecification jobSpec = - ExternalIndexingOperations.buildAbortOp(dataset, indexes, metadataProvider); + JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(dataset, indexes, + metadataProvider); executeHyracksJob(jobSpec); // 3. correct the dataset state ((ExternalDatasetDetails) dataset.getDatasetDetails()).setState(TransactionState.COMMIT); @@ -166,13 +173,13 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); } else if (datasetState == TransactionState.READY_TO_COMMIT) { - List<ExternalFile> files = - MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); + List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, + dataset); // if ready to commit, roll forward // 1. commit indexes in NCs metadataProvider.setMetadataTxnContext(mdTxnCtx); - JobSpecification jobSpec = - ExternalIndexingOperations.buildRecoverOp(dataset, indexes, metadataProvider); + JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(dataset, indexes, + metadataProvider); executeHyracksJob(jobSpec); // 2. add pending files in metadata for (ExternalFile file : files) { @@ -221,4 +228,5 @@ public boolean isRecoveryCompleted() { return recoveryCompleted; } + } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 0f6b396..e8f63b4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -80,13 +80,17 @@ } @Override - public void start(IServiceContext serviceCtx, String[] args) throws Exception { - if (args.length > 0) { - throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args)); - } + public void init(IServiceContext serviceCtx) throws Exception { this.ncServiceCtx = (INCServiceContext) serviceCtx; ncServiceCtx.setThreadFactory( new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), ncServiceCtx.getLifeCycleComponentManager())); + } + + @Override + public void start(String[] args) throws Exception { + if (args.length > 0) { + throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args)); + } nodeId = this.ncServiceCtx.getNodeId(); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting Asterix node controller: " + nodeId); @@ -111,8 +115,8 @@ MessagingProperties messagingProperties = runtimeContext.getMessagingProperties(); IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties); this.ncServiceCtx.setMessageBroker(messageBroker); - MessagingChannelInterfaceFactory interfaceFactory = - new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties); + MessagingChannelInterfaceFactory interfaceFactory = new MessagingChannelInterfaceFactory( + (NCMessageBroker) messageBroker, messagingProperties); this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory); IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager(); @@ -224,8 +228,8 @@ String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository()) .getStorageMountingPoints(); for (String ioDevice : ioDevices) { - String tempDatasetsDir = - ioDevice + storageDirName + File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER; + String tempDatasetsDir = ioDevice + storageDirName + File.separator + + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER; File tmpDsDir = new File(tempDatasetsDir); if (tmpDsDir.exists()) { IoUtil.delete(tmpDsDir); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java index bdbf4a5..e3424ec 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java @@ -20,23 +20,24 @@ public interface IClusterManagementWork { - public enum WorkType { + enum WorkType { ADD_NODE, REMOVE_NODE } - public enum ClusterState { - STARTING, - PENDING, - ACTIVE, - UNUSABLE, - REBALANCING, - SHUTTING_DOWN + enum ClusterState { + STARTING, // the initial state + UNUSABLE, // one or more cluster partitions are inactive or max id resources have not been reported + PENDING, // the metadata node has not yet joined & initialized + RECOVERING, // global recovery has not yet completed + ACTIVE, // cluster is ACTIVE and ready for requests + REBALANCING, // replication is processing failbacks + SHUTTING_DOWN // a shutdown request has been received, and is underway } - public WorkType getClusterManagementWorkType(); + WorkType getClusterManagementWorkType(); - public int getWorkId(); + int getWorkId(); - public IClusterEventsSubscriber getSourceSubscriber(); + IClusterEventsSubscriber getSourceSubscriber(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java index d36d383..ce49ccf 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java @@ -18,12 +18,14 @@ */ package org.apache.asterix.common.transactions; +import org.apache.hyracks.api.exceptions.HyracksDataException; + public interface IResourceIdManager { long createResourceId(); boolean reported(String nodeId); - void report(String nodeId, long maxResourceId); + void report(String nodeId, long maxResourceId) throws HyracksDataException; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java index 8a3392a..a97e22a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.metadata; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -29,7 +30,9 @@ public class GarbageCollector implements Runnable { private static final Logger LOGGER = Logger.getLogger(GarbageCollector.class.getName()); - private static final long CLEANUP_PERIOD = 3600L * 24; + // TODO(mblow): make this configurable + private static final long CLEANUP_PERIOD = 1; + private static final TimeUnit CLEANUP_PERIOD_UNIT = TimeUnit.DAYS; static { // Starts the garbage collector thread which @@ -40,13 +43,13 @@ } @Override - @SuppressWarnings("squid:S2142") // rethrow or interrupt thread on InterruptedException + @SuppressWarnings({"squid:S2142", "squid:S2189"}) // rethrow/interrupt thread on InterruptedException, endless loop public void run() { LOGGER.info("Starting Metadata GC"); while (true) { try { synchronized (this) { - this.wait(CLEANUP_PERIOD); + CLEANUP_PERIOD_UNIT.timedWait(this, CLEANUP_PERIOD); } MetadataManager.INSTANCE.cleanupTempDatasets(); } catch (InterruptedException e) { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index 82a1177..decc1a9 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -71,6 +71,6 @@ @Override public String toString() { - return ReportMaxResourceIdRequestMessage.class.getSimpleName(); + return ResourceIdRequestMessage.class.getSimpleName(); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java index 6a5ed08..afa626d 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java @@ -24,6 +24,7 @@ import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.transactions.IResourceIdManager; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class ResourceIdManager implements IResourceIdManager { @@ -59,13 +60,14 @@ } @Override - public synchronized void report(String nodeId, long maxResourceId) { + public synchronized void report(String nodeId, long maxResourceId) throws HyracksDataException { if (!allReported) { globalResourceId.set(Math.max(maxResourceId, globalResourceId.get())); reportedNodes.add(nodeId); if (reportedNodes.size() == csm.getNumberOfNodes()) { reportedNodes = null; allReported = true; + csm.refreshState(); } } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 36cb10d..334b683 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -40,6 +40,7 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.replication.IFaultToleranceStrategy; +import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.event.schema.cluster.Cluster; import org.apache.asterix.event.schema.cluster.Node; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -118,6 +119,10 @@ @Override public synchronized void setState(ClusterState state) { + if (this.state == state) { + LOGGER.info("ignoring update to same cluster state of " + this.state); + return; + } LOGGER.info("updating cluster state from " + this.state + " to " + state.name()); this.state = state; appCtx.getGlobalRecoveryManager().notifyStateChange(state); @@ -166,24 +171,35 @@ setState(ClusterState.UNUSABLE); return; } - for (ClusterPartition p : clusterPartitions.values()) { if (!p.isActive()) { setState(ClusterState.UNUSABLE); return; } } - - // if all storage partitions are active as well as the metadata node, then the cluster is active + IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); + for (String node : activeNcConfiguration.keySet()) { + if (!resourceIdManager.reported(node)) { + LOGGER.log(Level.INFO, "Partitions are ready but %s has not yet registered its max resource id...", + node); + setState(ClusterState.UNUSABLE); + return; + } + } + // the metadata bootstrap & global recovery must be complete before the cluster can be active if (metadataNodeActive) { - if (state != ClusterState.ACTIVE) { + if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) { setState(ClusterState.PENDING); } appCtx.getMetadataBootstrap().init(); - setState(ClusterState.ACTIVE); - notifyAll(); - // start global recovery - appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx); + + if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) { + setState(ClusterState.ACTIVE); + } else { + // start global recovery + setState(ClusterState.RECOVERING); + appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx); + } } else { setState(ClusterState.PENDING); } @@ -269,8 +285,8 @@ clusterActiveLocations.add(p.getActiveNodeId()); } } - clusterPartitionConstraint = - new AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new String[] {})); + clusterPartitionConstraint = new AlgebricksAbsolutePartitionConstraint( + clusterActiveLocations.toArray(new String[] {})); } @Override @@ -432,8 +448,8 @@ } private void updateNodeConfig(String nodeId, Map<IOption, Object> configuration) { - ConfigManager configManager = - ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()).getConfigManager(); + ConfigManager configManager = ((ConfigManagerApplicationConfig) appCtx.getServiceContext().getAppConfig()) + .getConfigManager(); for (Map.Entry<IOption, Object> entry : configuration.entrySet()) { if (entry.getKey().section() == Section.NC) { configManager.set(nodeId, entry.getKey(), entry.getValue()); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java index 3ce314f..1d22f85 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java @@ -21,12 +21,15 @@ import org.apache.hyracks.api.config.IConfigManager; import org.kohsuke.args4j.OptionHandlerFilter; +@SuppressWarnings("squid:S00112") // define and throw specific class of Exception public interface IApplication { - void start(IServiceContext ctx, String[] args) throws Exception; //NOSONAR + void init(IServiceContext serviceCtx) throws Exception; - void startupCompleted() throws Exception; //NOSONAR + void start(String[] args) throws Exception; - void stop() throws Exception; //NOSONAR + void startupCompleted() throws Exception; + + void stop() throws Exception; void registerConfig(IConfigManager configManager); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java index b94cf01..5ea51d1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java @@ -40,7 +40,12 @@ } @Override - public void start(IServiceContext serviceCtx, String[] args) throws Exception { + public void init(IServiceContext serviceCtx) throws Exception { + // no-op + } + + @Override + public void start(String[] args) throws Exception { if (args.length > 0) { throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args)); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index a243bf8..dfc79ed 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -198,9 +198,9 @@ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI, new CCNCFunctions.SerializerDeserializer()); IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory); - clientIPC = - new IPCSystem(new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()), - ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer()); + clientIPC = new IPCSystem( + new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()), ciIPCI, + new JavaSerializationBasedPayloadSerializerDeserializer()); webServer = new WebServer(this, ccConfig.getConsoleListenPort()); clusterIPC.start(); clientIPC.start(); @@ -221,15 +221,16 @@ private void startApplication() throws Exception { serviceCtx = new CCServiceContext(this, serverCtx, ccContext, ccConfig.getAppConfig()); serviceCtx.addJobLifecycleListener(datasetDirectoryService); + application.init(serviceCtx); executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory()); - application.start(serviceCtx, ccConfig.getAppArgsArray()); + application.start(ccConfig.getAppArgsArray()); IJobCapacityController jobCapacityController = application.getJobCapacityController(); // Job manager is in charge of job lifecycle management. try { - Constructor<?> jobManagerConstructor = - this.getClass().getClassLoader().loadClass(ccConfig.getJobManagerClass()).getConstructor( - CCConfig.class, ClusterControllerService.class, IJobCapacityController.class); + Constructor<?> jobManagerConstructor = this.getClass().getClassLoader() + .loadClass(ccConfig.getJobManagerClass()) + .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class); jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { @@ -406,8 +407,8 @@ @Override public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException { - GetIpAddressNodeNameMapWork ginmw = - new GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(), map); + GetIpAddressNodeNameMapWork ginmw = new GetIpAddressNodeNameMapWork( + ClusterControllerService.this.getNodeManager(), map); try { workQueue.scheduleAndSync(ginmw); } catch (Exception e) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java index 5afc98d..4d8cbbd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java @@ -41,7 +41,12 @@ } @Override - public void start(IServiceContext ncAppCtx, String[] args) throws Exception { + public void init(IServiceContext serviceCtx) throws Exception { + // no-op + } + + @Override + public void start(String[] args) throws Exception { if (args.length > 0) { throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args)); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index b52675c..ed5598b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -194,8 +194,8 @@ // Set shutdown hook before so it doesn't have the same uncaught exception handler Runtime.getRuntime().addShutdownHook(new NCShutdownHook(this)); Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager()); - ioManager = - new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver()); + ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), + application.getFileDeviceResolver()); workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new Hashtable<>(); @@ -336,8 +336,8 @@ // Use "public" versions of network addresses and ports NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress(); NetworkAddress netAddress = netManager.getPublicNetworkAddress(); - NetworkAddress meesagingPort = - messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() : null; + NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress() + : null; int allCores = osMXBean.getAvailableProcessors(); nodeRegistration = new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), allCores, runtimeMXBean.getVmName(), @@ -365,8 +365,9 @@ private void startApplication() throws Exception { serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig()); - application.start(serviceCtx, ncConfig.getAppArgsArray()); + application.init(serviceCtx); executor = Executors.newCachedThreadPool(serviceCtx.getThreadFactory()); + application.start(ncConfig.getAppArgsArray()); } public void updateMaxJobId(JobId jobId) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java index dc0bf0c..d659fe6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java @@ -19,7 +19,6 @@ package org.apache.hyracks.control.nc.application; import java.io.IOException; -import java.io.OutputStream; import java.io.Serializable; import org.apache.hyracks.api.application.INCServiceContext; @@ -54,13 +53,7 @@ this.ioManager = ioManager; this.memoryManager = memoryManager; this.ncs = ncs; - sdh = new IStateDumpHandler() { - - @Override - public void dumpState(OutputStream os) throws IOException { - lccm.dumpState(os); - } - }; + sdh = lccm::dumpState; } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java index 2967039..780a65c 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java @@ -30,11 +30,16 @@ private RuntimeContext rCtx; @Override - public void start(IServiceContext serviceCtx, String[] args) throws Exception { + public void init(IServiceContext serviceCtx) throws Exception { rCtx = new RuntimeContext((INCServiceContext) serviceCtx); } @Override + public void start(String[] args) throws Exception { + // No-op + } + + @Override public void startupCompleted() throws Exception { // No-op } -- To view, visit https://asterix-gerrit.ics.uci.edu/2004 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
