abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Make start and cancel job uninterruptible ......................................................................
[NO ISSUE][RT] Make start and cancel job uninterruptible - user model changes: no - storage format changes: no - interface changes: no Details: - Previously, a Hyracks data connection that starts a job might be interrupted before it receives the job id. This would leak resources since that job will run even though, no one is going to read its result. - Similarly, job cancellation can be interrupted and so the job which was meant to be cancelled will continue running. - To avoid this, a new thread is added to Hyracks Client Connection which takes care of starting and cancelling of jobs. The thread submitting these requests will be un-interruptible until those calls return. Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2639 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java 4 files changed, 197 insertions(+), 16 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, but someone else must approve Jenkins: Verified; ; Verified Murtadha Hubail: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java index edac0fa..f22693a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java @@ -33,8 +33,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.InputSplit; -import org.apache.hyracks.api.client.HyracksConnection; -import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; @@ -53,19 +51,14 @@ /** a map from the NC name to the index */ private Map<String, Integer> ncNameToIndex = new HashMap<String, Integer>(); - /** a map from NC name to the NodeControllerInfo */ - private Map<String, NodeControllerInfo> ncNameToNcInfos; - /** * The constructor of the scheduler. * * @param ncNameToNcInfos * @throws HyracksException */ - public IndexingScheduler(String ipAddress, int port) throws HyracksException { + public IndexingScheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) throws HyracksException { try { - IHyracksClientConnection hcc = new HyracksConnection(ipAddress, port); - this.ncNameToNcInfos = hcc.getNodeControllerInfos(); loadIPAddressToNCMap(ncNameToNcInfos); } catch (Exception e) { throw HyracksException.create(e); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java index bd50352..1b13ec5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java @@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.api.application.ICCServiceContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.context.ICCContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; @@ -67,11 +68,11 @@ public static IndexingScheduler initializeIndexingHDFSScheduler(ICCServiceContext serviceCtx) throws HyracksDataException { - ICCContext ccContext = serviceCtx.getCCContext(); IndexingScheduler scheduler = null; try { - scheduler = new IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(), - ccContext.getClusterControllerInfo().getClientNetPort()); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + IHyracksClientConnection hcc = appCtx.getHcc(); + scheduler = new IndexingScheduler(hcc.getNodeControllerInfos()); } catch (HyracksException e) { throw new RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java index f635d94..cfa6f78 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java @@ -26,6 +26,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; @@ -44,10 +49,16 @@ import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.topology.ClusterTopology; +import org.apache.hyracks.api.util.InvokeUtil; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.ipc.api.RPCInterface; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; +import org.apache.hyracks.util.ExitUtil; +import org.apache.hyracks.util.InterruptibleAction; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster @@ -56,6 +67,9 @@ * @author vinayakb */ public final class HyracksConnection implements IHyracksClientConnection { + + private static final Logger LOGGER = LogManager.getLogger(); + private final String ccHost; private final int ccPort; @@ -65,6 +79,15 @@ private final IHyracksClientInterface hci; private final ClusterControllerInfo ccInfo; + + private volatile boolean running = false; + + private volatile long reqId = 0L; + + private final ExecutorService uninterruptibleExecutor = Executors.newFixedThreadPool(2, + r -> new Thread(r, "HyracksConnection Uninterrubtible thread: " + r.getClass().getSimpleName())); + + private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1); /** * Constructor to create a connection to the Hyracks Cluster Controller. @@ -86,6 +109,8 @@ hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)), rpci); ccInfo = hci.getClusterControllerInfo(); + uninterruptibleExecutor.execute(new UninterrubtileRequestHandler()); + uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher()); } @Override @@ -95,7 +120,8 @@ @Override public void cancelJob(JobId jobId) throws Exception { - hci.cancelJob(jobId); + CancelJobRequest request = new CancelJobRequest(jobId); + uninterruptiblySubmitAndExecute(request); } @Override @@ -131,12 +157,13 @@ @Override public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) throws Exception { - return hci.startJob(deployedJobSpecId, jobParameters); + StartDeployedJobRequest request = new StartDeployedJobRequest(deployedJobSpecId, jobParameters); + return interruptiblySubmitAndExecute(request); } @Override public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception { - return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); + return startJob(null, acggf, jobFlags); } public DeployedJobSpecId deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception { @@ -154,7 +181,7 @@ hci.waitForCompletion(jobId); } catch (InterruptedException e) { // Cancels an on-going job if the current thread gets interrupted. - hci.cancelJob(jobId); + cancelJob(jobId); throw e; } } @@ -232,7 +259,8 @@ @Override public JobId startJob(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, EnumSet<JobFlag> jobFlags) throws Exception { - return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags); + StartJobRequest request = new StartJobRequest(deploymentId, acggf, jobFlags); + return interruptiblySubmitAndExecute(request); } @Override @@ -269,4 +297,162 @@ public boolean isConnected() { return hci.isConnected(); } + + private <T> T uninterruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception { + InvokeUtil.doUninterruptibly(() -> uninterruptibles.put(request)); + return uninterruptiblyExecute(request); + } + + private <T> T uninterruptiblyExecute(UnInterruptibleRequest<T> request) throws Exception { + InvokeUtil.doUninterruptibly(request); + return request.result(); + } + + private <T> T interruptiblySubmitAndExecute(UnInterruptibleRequest<T> request) throws Exception { + uninterruptibles.put(request); + return uninterruptiblyExecute(request); + } + + private abstract class UnInterruptibleRequest<T> implements InterruptibleAction { + boolean completed = false; + boolean failed = false; + Throwable failure = null; + T response = null; + + @SuppressWarnings("squid:S1181") + private final void handle() { + try { + response = doHandle(); + } catch (Throwable th) { + failed = true; + failure = th; + } finally { + synchronized (this) { + completed = true; + notifyAll(); + } + } + } + + protected abstract T doHandle() throws Exception; + + @Override + public final synchronized void run() throws InterruptedException { + while (!completed) { + wait(); + } + } + + public T result() throws Exception { + if (failed) { + if (failure instanceof Error) { + throw (Error) failure; + } + throw (Exception) failure; + } + return response; + } + } + + private class CancelJobRequest extends UnInterruptibleRequest<Void> { + final JobId jobId; + + public CancelJobRequest(JobId jobId) { + this.jobId = jobId; + } + + @Override + protected Void doHandle() throws Exception { + hci.cancelJob(jobId); + return null; + } + + } + + private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> { + + private final DeployedJobSpecId deployedJobSpecId; + private final Map<byte[], byte[]> jobParameters; + + public StartDeployedJobRequest(DeployedJobSpecId deployedJobSpecId, Map<byte[], byte[]> jobParameters) { + this.deployedJobSpecId = deployedJobSpecId; + this.jobParameters = jobParameters; + } + + @Override + protected JobId doHandle() throws Exception { + return hci.startJob(deployedJobSpecId, jobParameters); + } + + } + + private class StartJobRequest extends UnInterruptibleRequest<JobId> { + private final DeploymentId deploymentId; + private final IActivityClusterGraphGeneratorFactory acggf; + private final EnumSet<JobFlag> jobFlags; + + public StartJobRequest(DeploymentId deploymentId, IActivityClusterGraphGeneratorFactory acggf, + EnumSet<JobFlag> jobFlags) { + this.deploymentId = deploymentId; + this.acggf = acggf; + this.jobFlags = jobFlags; + } + + @Override + protected JobId doHandle() throws Exception { + if (deploymentId == null) { + return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags); + } else { + return hci.startJob(deploymentId, JavaSerializationUtils.serialize(acggf), jobFlags); + } + } + + } + + private class UninterrubtileRequestHandler implements Runnable { + @SuppressWarnings({ "squid:S2189", "squid:S2142" }) + @Override + public void run() { + while (true) { + try { + UnInterruptibleRequest<?> next = uninterruptibles.take(); + reqId++; + running = true; + next.handle(); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); + continue; + } finally { + running = false; + } + } + } + } + + public class UninterrubtileHandlerWatcher implements Runnable { + @Override + @SuppressWarnings({ "squid:S2189", "squid:S2142" }) + public void run() { + long currentReqId = 0L; + long currentTime = System.nanoTime(); + while (true) { + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException e) { + LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted."); + continue; + } + if (running) { + if (reqId == currentReqId) { + if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) { + ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST); + } + } else { + currentReqId = reqId; + currentTime = System.nanoTime(); + } + } + } + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index 4aa123b..c8b9112 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -34,6 +34,7 @@ public static final int EC_FAILED_TO_STARTUP = 2; public static final int EC_FAILED_TO_RECOVER = 3; public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4; + public static final int EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST = 5; public static final int EC_UNHANDLED_EXCEPTION = 11; public static final int EC_IMMEDIATE_HALT = 33; public static final int EC_HALT_ABNORMAL_RESERVED_44 = 44; -- To view, visit https://asterix-gerrit.ics.uci.edu/2639 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8 Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
