abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2404
Change subject: PLEASE EDIT to provide a meaningful commit message! ...................................................................... PLEASE EDIT to provide a meaningful commit message! The following commits from your working branch will be included: commit 43228caa7733c093dd98dff6b9ec04a940f1b4b3 Author: Abdullah Alamoudi <[email protected]> Date: Sun Feb 18 14:09:26 2018 -0800 [NO ISSUE][ING] Prevent duplicate active runtimes in NCs - user model changes: no - storage format changes: no - interface changes: yes - Add getApplication() to NodeControllerService details: - Previously, when an active runtime is registered, we ignore if the runtime id is already registered. - After this change, such operation will throw an exception. - In addition, this change ensures that all previous tasks of a CC on an NC are completed before completion of registration. Change-Id: I2626046ca6e809d964497a6e075f5b8848a7c1ea --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java M hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java 13 files changed, 121 insertions(+), 15 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/04/2404/1 diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index f9aef4c..77ca2c2 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -73,7 +73,9 @@ if (shutdown) { throw new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_SHUTDOWN); } - runtimes.putIfAbsent(runtime.getRuntimeId(), runtime); + if (runtimes.putIfAbsent(runtime.getRuntimeId(), runtime) != null) { + throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_ALREADY_REGISTERED, runtime.getRuntimeId()); + } } public void deregisterRuntime(ActiveRuntimeId id) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 1758daa..0902203 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -182,8 +182,9 @@ protected void finish(ActiveEvent event) throws HyracksDataException { LOGGER.log(level, "the job " + jobId + " finished"); if (numRegistered != numDeRegistered) { - LOGGER.log(Level.WARN, "the job " + jobId + " finished with reported runtime registrations = " - + numRegistered + " and deregistrations = " + numDeRegistered + " on node controllers"); + LOGGER.log(Level.WARN, + "the job {} finished with reported runtime registrations = {} and deregistrations = {} on node controllers", + jobId, numRegistered, numDeRegistered); } jobId = null; Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); @@ -194,8 +195,7 @@ jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); setState((state == ActivityState.STOPPING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED); - if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING - && prevState != ActivityState.RESUMING && prevState != ActivityState.STOPPING) { + if (prevState == ActivityState.RUNNING) { recover(); } } else { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index b0382f7..6b59942 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -207,7 +207,7 @@ } @Override - public void onRegisterNode(CcId ccId) throws Exception { + public void completeRegistration(CcId ccId) throws HyracksDataException { if (startupCompleted) { /* * If the node completed its startup before, then this is a re-registration with diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 1a83603..bc5c365 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -255,6 +255,7 @@ public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110; public static final int FEED_START_FEED_WITHOUT_CONNECTION = 3111; public static final int PARSER_COLLECTION_ITEM_CANNOT_BE_NULL = 3112; + public static final int ACTIVE_RUNTIME_ALREADY_REGISTERED = 3113; // Lifecycle management errors public static final int DUPLICATE_PARTITION_ID = 4000; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 995b541..9bf513e 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -244,6 +244,7 @@ 3110 = Feed failed while reading a new record 3111 = Feed %1$s is not connected to any dataset 3112 = Array/Multiset item cannot be null +3113 = Active Runtime %1$s is already registered # Lifecycle management errors 4000 = Partition id %1$d for node %2$s already in use by node %3$s diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java index af6cb92..9a07fb8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplication.java @@ -19,6 +19,7 @@ package org.apache.hyracks.api.application; import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; @@ -34,5 +35,14 @@ */ IFileDeviceResolver getFileDeviceResolver(); - void onRegisterNode(CcId ccId) throws Exception; + /** + * Called when a node ensures that no leftover resources are still there + * from previous registrations + * + * @param ccId + * the id of the cluster controller + * @throws HyracksDataException + * if a failure completing the registration takes place + */ + void completeRegistration(CcId ccId) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index f8fe77f..936f6dd 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -328,7 +328,6 @@ } private void stopApplication() throws Exception { - application.stop(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java index ea16032..fe18461 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.config.Section; import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; import org.apache.hyracks.api.util.HyracksConstants; @@ -60,7 +61,7 @@ } @Override - public void onRegisterNode(CcId ccId) throws Exception { + public void completeRegistration(CcId ccId) throws HyracksDataException { // no-op } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 0f40b60..1883947 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -62,6 +62,7 @@ import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager; import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager; import org.apache.hyracks.api.service.IControllerService; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.control.common.base.IClusterController; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.context.ServerContext; @@ -95,7 +96,6 @@ import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters; import org.apache.hyracks.util.ExitUtil; -import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.util.PidHelper; import org.apache.hyracks.util.trace.ITracer; import org.apache.hyracks.util.trace.Tracer; @@ -419,7 +419,6 @@ if (distributedState != null) { getDistributedState().put(ccId, distributedState); } - application.onRegisterNode(ccId); IClusterController ccs = ccc.getClusterControllerService(); NodeParameters nodeParameters = ccc.getNodeParameters(); @@ -798,4 +797,8 @@ return application.getApplicationContext(); } + public INCApplication getApplication() { + return application; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 9b32cc7..16c8156 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -102,6 +102,8 @@ private volatile boolean aborted; + private volatile boolean completed = false; + private NodeControllerService ncs; private List<List<PartitionChannel>> inputChannelsFromConnectors; @@ -255,8 +257,7 @@ if (aborted) { return false; } - pendingThreads.add(t); - return true; + return pendingThreads.add(t); } private synchronized void removePendingThread(Thread t) { @@ -276,6 +277,7 @@ public void run() { Thread ct = Thread.currentThread(); String threadName = ct.getName(); + ct.setName(displayName + ":" + taskAttemptId + ":" + 0); // Calls synchronized addPendingThread(..) to make sure that in the abort() method, // the thread is not escaped from interruption. if (!addPendingThread(ct)) { @@ -285,7 +287,6 @@ .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId)); return; } - ct.setName(displayName + ":" + taskAttemptId + ":" + 0); try { Exception operatorException = null; try { @@ -354,6 +355,7 @@ ct.setName(threadName); close(); removePendingThread(ct); + completed = true; } if (!exceptions.isEmpty()) { if (LOGGER.isWarnEnabled()) { @@ -461,6 +463,7 @@ return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, start, length); } + @Override public Set<JobFlag> getJobFlags() { return jobFlags; } @@ -469,4 +472,8 @@ public IStatsCollector getStatsCollector() { return statsCollector; } + + public boolean isCompleted() { + return completed; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java index 2bcf414..40c0e97 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java @@ -18,7 +18,9 @@ */ package org.apache.hyracks.control.nc.work; +import java.util.ArrayDeque; import java.util.Collection; +import java.util.Deque; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; @@ -50,12 +52,14 @@ if (dpm == null) { LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId()); } + Deque<Task> abortedTasks = new ArrayDeque<>(); Collection<Joblet> joblets = ncs.getJobletMap().values(); // TODO(mblow): should we have one jobletmap per cc? joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> { Collection<Task> tasks = joblet.getTaskMap().values(); for (Task task : tasks) { task.abort(); + abortedTasks.add(task); } final JobId jobId = joblet.getJobId(); if (dpm != null) { @@ -64,5 +68,6 @@ } ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE)); }); + ncs.getWorkQueue().schedule(new EnsureAllCcTasksCompleted(ncs, ccId, abortedTasks)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java new file mode 100644 index 0000000..8c609ec --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.work; + +import java.util.Deque; +import java.util.concurrent.TimeUnit; + +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.control.common.work.AbstractWork; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.control.nc.Task; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class EnsureAllCcTasksCompleted extends AbstractWork { + + private static final Logger LOGGER = LogManager.getLogger(); + private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(2); + private final NodeControllerService ncs; + private final CcId ccId; + private final Deque<Task> abortedTasks; + private final long startTime; + + public EnsureAllCcTasksCompleted(NodeControllerService ncs, CcId ccId, Deque<Task> abortedTasks) { + this.ncs = ncs; + this.ccId = ccId; + this.abortedTasks = abortedTasks; + startTime = System.currentTimeMillis(); + } + + @Override + public void run() { + int numTasks = abortedTasks.size(); + for (int i = 0; i < numTasks; i++) { + Task task = abortedTasks.poll(); + if (!task.isCompleted()) { + abortedTasks.add(task); + } + } + if (abortedTasks.isEmpty()) { + // all tasks has completed + try { + ncs.getApplication().completeRegistration(ccId); + } catch (HyracksDataException e) { + LOGGER.log(Level.WARN, "Failed to complete registration", e); + } + } else { + if (System.currentTimeMillis() - startTime > TIMEOUT) { + LOGGER.log(Level.ERROR, + "Failed to abort all previous tasks associated with CC {} after {}ms. Giving up", ccId, + TIMEOUT); + } else { + ncs.getWorkQueue().schedule(this); + } + } + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java index 15248e7..ab9aaec 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/TestNCApplication.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; @@ -46,7 +47,7 @@ } @Override - public void onRegisterNode(CcId ccs) throws Exception { + public void completeRegistration(CcId ccs) throws HyracksDataException { // No-op } -- To view, visit https://asterix-gerrit.ics.uci.edu/2404 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2626046ca6e809d964497a6e075f5b8848a7c1ea Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]>
