abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2639
Change subject: WIP start and cancel job in an uninterruptible thread
......................................................................
WIP start and cancel job in an uninterruptible thread
Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
3 files changed, 149 insertions(+), 16 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/39/2639/1
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index edac0fa..f22693a 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -33,8 +33,6 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -53,19 +51,14 @@
/** a map from the NC name to the index */
private Map<String, Integer> ncNameToIndex = new HashMap<String,
Integer>();
- /** a map from NC name to the NodeControllerInfo */
- private Map<String, NodeControllerInfo> ncNameToNcInfos;
-
/**
* The constructor of the scheduler.
*
* @param ncNameToNcInfos
* @throws HyracksException
*/
- public IndexingScheduler(String ipAddress, int port) throws
HyracksException {
+ public IndexingScheduler(Map<String, NodeControllerInfo> ncNameToNcInfos)
throws HyracksException {
try {
- IHyracksClientConnection hcc = new HyracksConnection(ipAddress,
port);
- this.ncNameToNcInfos = hcc.getNodeControllerInfos();
loadIPAddressToNCMap(ncNameToNcInfos);
} catch (Exception e) {
throw HyracksException.create(e);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index bd50352..1b13ec5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.mapred.TextInputFormat;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -67,11 +68,11 @@
public static IndexingScheduler
initializeIndexingHDFSScheduler(ICCServiceContext serviceCtx)
throws HyracksDataException {
- ICCContext ccContext = serviceCtx.getCCContext();
IndexingScheduler scheduler = null;
try {
- scheduler = new
IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
- ccContext.getClusterControllerInfo().getClientNetPort());
+ ICcApplicationContext appCtx = (ICcApplicationContext)
serviceCtx.getApplicationContext();
+ IHyracksClientConnection hcc = appCtx.getHcc();
+ scheduler = new IndexingScheduler(hcc.getNodeControllerInfos());
} catch (HyracksException e) {
throw new
RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 5b98778..de8cab5 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,10 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
@@ -44,10 +48,15 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.ipc.api.RPCInterface;
import org.apache.hyracks.ipc.impl.IPCSystem;
import
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.util.InterruptibleAction;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -56,6 +65,9 @@
* @author vinayakb
*/
public final class HyracksConnection implements IHyracksClientConnection {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
private final String ccHost;
private final int ccPort;
@@ -65,6 +77,10 @@
private final IHyracksClientInterface hci;
private final ClusterControllerInfo ccInfo;
+
+ private final ExecutorService uninterruptibleExecutor =
+ Executors.newSingleThreadExecutor(r -> new Thread(r,
"HyracksConnection Uninterrubtible thread"));
+ private final BlockingQueue<UnInterruptibleRequest> uninterruptibles = new
ArrayBlockingQueue<>(1);
/**
* Constructor to create a connection to the Hyracks Cluster Controller.
@@ -86,6 +102,7 @@
hci = new
HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new
InetSocketAddress(ccHost, ccPort)),
rpci);
ccInfo = hci.getClusterControllerInfo();
+ uninterruptibleExecutor.execute(new UninterrubtileRequestHandler());
}
@Override
@@ -95,7 +112,10 @@
@Override
public void cancelJob(JobId jobId) throws Exception {
- hci.cancelJob(jobId);
+ CancelJobRequest request = new CancelJobRequest(jobId);
+ InvokeUtil.doUninterruptibly(() -> uninterruptibles.put(request));
+ InvokeUtil.doUninterruptibly(request);
+ request.result();
}
@Override
@@ -132,12 +152,15 @@
@Override
public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[],
byte[]> jobParameters) throws Exception {
- return hci.startJob(deployedJobSpecId, jobParameters);
+ StartDeployedJobRequest request = new
StartDeployedJobRequest(deployedJobSpecId, jobParameters);
+ uninterruptibles.put(request);
+ InvokeUtil.doUninterruptibly(request);
+ return request.result();
}
@Override
public JobId startJob(IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception {
- return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
+ return startJob(null, acggf, jobFlags);
}
public DeployedJobSpecId
deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
@@ -155,7 +178,7 @@
hci.waitForCompletion(jobId);
} catch (InterruptedException e) {
// Cancels an on-going job if the current thread gets interrupted.
- hci.cancelJob(jobId);
+ cancelJob(jobId);
throw e;
}
}
@@ -233,7 +256,10 @@
@Override
public JobId startJob(DeploymentId deploymentId,
IActivityClusterGraphGeneratorFactory acggf,
EnumSet<JobFlag> jobFlags) throws Exception {
- return hci.startJob(deploymentId,
JavaSerializationUtils.serialize(acggf), jobFlags);
+ StartJobRequest request = new StartJobRequest(deploymentId, acggf,
jobFlags);
+ uninterruptibles.put(request);
+ InvokeUtil.doUninterruptibly(request);
+ return request.result();
}
@Override
@@ -270,4 +296,117 @@
public boolean isConnected() {
return hci.isConnected();
}
+
+ private abstract class UnInterruptibleRequest<T> implements
InterruptibleAction {
+ boolean completed = false;
+ boolean failed = false;
+ Throwable failure = null;
+ T response = null;
+
+ @SuppressWarnings("squid:S1181")
+ private final void handle() {
+ try {
+ response = doHandle();
+ } catch (Throwable th) {
+ failed = true;
+ failure = th;
+ } finally {
+ synchronized (this) {
+ completed = true;
+ notifyAll();
+ }
+ }
+ }
+
+ protected abstract T doHandle() throws Exception;
+
+ @Override
+ public final synchronized void run() throws InterruptedException {
+ while (!completed) {
+ wait();
+ }
+ }
+
+ public T result() throws Exception {
+ if (failed) {
+ if (failure instanceof Error) {
+ throw (Error) failure;
+ }
+ throw (Exception) failure;
+ }
+ return response;
+ }
+ }
+
+ private class CancelJobRequest extends UnInterruptibleRequest<Void> {
+ final JobId jobId;
+
+ public CancelJobRequest(JobId jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ protected Void doHandle() throws Exception {
+ hci.cancelJob(jobId);
+ return null;
+ }
+
+ }
+
+ private class StartDeployedJobRequest extends
UnInterruptibleRequest<JobId> {
+
+ private final DeployedJobSpecId deployedJobSpecId;
+ private final Map<byte[], byte[]> jobParameters;
+
+ public StartDeployedJobRequest(DeployedJobSpecId deployedJobSpecId,
Map<byte[], byte[]> jobParameters) {
+ this.deployedJobSpecId = deployedJobSpecId;
+ this.jobParameters = jobParameters;
+ }
+
+ @Override
+ protected JobId doHandle() throws Exception {
+ return hci.startJob(deployedJobSpecId, jobParameters);
+ }
+
+ }
+
+ private class StartJobRequest extends UnInterruptibleRequest<JobId> {
+ private final DeploymentId deploymentId;
+ private final IActivityClusterGraphGeneratorFactory acggf;
+ private final EnumSet<JobFlag> jobFlags;
+
+ public StartJobRequest(DeploymentId deploymentId,
IActivityClusterGraphGeneratorFactory acggf,
+ EnumSet<JobFlag> jobFlags) {
+ this.deploymentId = deploymentId;
+ this.acggf = acggf;
+ this.jobFlags = jobFlags;
+ }
+
+ @Override
+ protected JobId doHandle() throws Exception {
+ if (deploymentId == null) {
+ return hci.startJob(JavaSerializationUtils.serialize(acggf),
jobFlags);
+ } else {
+ return hci.startJob(deploymentId,
JavaSerializationUtils.serialize(acggf), jobFlags);
+ }
+ }
+
+ }
+
+ private class UninterrubtileRequestHandler implements Runnable {
+
+ @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ uninterruptibles.take().handle();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. This thread
should never be interrupted.");
+ continue;
+ }
+ }
+ }
+
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2639
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>