abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2639

Change subject: WIP start and cancel job in an uninterruptible thread
......................................................................

WIP start and cancel job in an uninterruptible thread

Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
3 files changed, 149 insertions(+), 16 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/39/2639/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index edac0fa..f22693a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -33,8 +33,6 @@
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -53,19 +51,14 @@
     /** a map from the NC name to the index */
     private Map<String, Integer> ncNameToIndex = new HashMap<String, 
Integer>();
 
-    /** a map from NC name to the NodeControllerInfo */
-    private Map<String, NodeControllerInfo> ncNameToNcInfos;
-
     /**
      * The constructor of the scheduler.
      *
      * @param ncNameToNcInfos
      * @throws HyracksException
      */
-    public IndexingScheduler(String ipAddress, int port) throws 
HyracksException {
+    public IndexingScheduler(Map<String, NodeControllerInfo> ncNameToNcInfos) 
throws HyracksException {
         try {
-            IHyracksClientConnection hcc = new HyracksConnection(ipAddress, 
port);
-            this.ncNameToNcInfos = hcc.getNodeControllerInfos();
             loadIPAddressToNCMap(ncNameToNcInfos);
         } catch (Exception e) {
             throw HyracksException.create(e);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index bd50352..1b13ec5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -46,6 +46,7 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
@@ -67,11 +68,11 @@
 
     public static IndexingScheduler 
initializeIndexingHDFSScheduler(ICCServiceContext serviceCtx)
             throws HyracksDataException {
-        ICCContext ccContext = serviceCtx.getCCContext();
         IndexingScheduler scheduler = null;
         try {
-            scheduler = new 
IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
-                    ccContext.getClusterControllerInfo().getClientNetPort());
+            ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
+            IHyracksClientConnection hcc = appCtx.getHcc();
+            scheduler = new IndexingScheduler(hcc.getNodeControllerInfos());
         } catch (HyracksException e) {
             throw new 
RuntimeDataException(ErrorCode.UTIL_HDFS_UTILS_CANNOT_OBTAIN_HDFS_SCHEDULER);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index 5b98778..de8cab5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -26,6 +26,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPut;
@@ -44,10 +48,15 @@
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.topology.ClusterTopology;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.ipc.api.RPCInterface;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import 
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.util.InterruptibleAction;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * Connection Class used by a Hyracks Client to interact with a Hyracks Cluster
@@ -56,6 +65,9 @@
  * @author vinayakb
  */
 public final class HyracksConnection implements IHyracksClientConnection {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private final String ccHost;
 
     private final int ccPort;
@@ -65,6 +77,10 @@
     private final IHyracksClientInterface hci;
 
     private final ClusterControllerInfo ccInfo;
+
+    private final ExecutorService uninterruptibleExecutor =
+            Executors.newSingleThreadExecutor(r -> new Thread(r, 
"HyracksConnection Uninterrubtible thread"));
+    private final BlockingQueue<UnInterruptibleRequest> uninterruptibles = new 
ArrayBlockingQueue<>(1);
 
     /**
      * Constructor to create a connection to the Hyracks Cluster Controller.
@@ -86,6 +102,7 @@
         hci = new 
HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new 
InetSocketAddress(ccHost, ccPort)),
                 rpci);
         ccInfo = hci.getClusterControllerInfo();
+        uninterruptibleExecutor.execute(new UninterrubtileRequestHandler());
     }
 
     @Override
@@ -95,7 +112,10 @@
 
     @Override
     public void cancelJob(JobId jobId) throws Exception {
-        hci.cancelJob(jobId);
+        CancelJobRequest request = new CancelJobRequest(jobId);
+        InvokeUtil.doUninterruptibly(() -> uninterruptibles.put(request));
+        InvokeUtil.doUninterruptibly(request);
+        request.result();
     }
 
     @Override
@@ -132,12 +152,15 @@
 
     @Override
     public JobId startJob(DeployedJobSpecId deployedJobSpecId, Map<byte[], 
byte[]> jobParameters) throws Exception {
-        return hci.startJob(deployedJobSpecId, jobParameters);
+        StartDeployedJobRequest request = new 
StartDeployedJobRequest(deployedJobSpecId, jobParameters);
+        uninterruptibles.put(request);
+        InvokeUtil.doUninterruptibly(request);
+        return request.result();
     }
 
     @Override
     public JobId startJob(IActivityClusterGraphGeneratorFactory acggf, 
EnumSet<JobFlag> jobFlags) throws Exception {
-        return hci.startJob(JavaSerializationUtils.serialize(acggf), jobFlags);
+        return startJob(null, acggf, jobFlags);
     }
 
     public DeployedJobSpecId 
deployJobSpec(IActivityClusterGraphGeneratorFactory acggf) throws Exception {
@@ -155,7 +178,7 @@
             hci.waitForCompletion(jobId);
         } catch (InterruptedException e) {
             // Cancels an on-going job if the current thread gets interrupted.
-            hci.cancelJob(jobId);
+            cancelJob(jobId);
             throw e;
         }
     }
@@ -233,7 +256,10 @@
     @Override
     public JobId startJob(DeploymentId deploymentId, 
IActivityClusterGraphGeneratorFactory acggf,
             EnumSet<JobFlag> jobFlags) throws Exception {
-        return hci.startJob(deploymentId, 
JavaSerializationUtils.serialize(acggf), jobFlags);
+        StartJobRequest request = new StartJobRequest(deploymentId, acggf, 
jobFlags);
+        uninterruptibles.put(request);
+        InvokeUtil.doUninterruptibly(request);
+        return request.result();
     }
 
     @Override
@@ -270,4 +296,117 @@
     public boolean isConnected() {
         return hci.isConnected();
     }
+
+    private abstract class UnInterruptibleRequest<T> implements 
InterruptibleAction {
+        boolean completed = false;
+        boolean failed = false;
+        Throwable failure = null;
+        T response = null;
+
+        @SuppressWarnings("squid:S1181")
+        private final void handle() {
+            try {
+                response = doHandle();
+            } catch (Throwable th) {
+                failed = true;
+                failure = th;
+            } finally {
+                synchronized (this) {
+                    completed = true;
+                    notifyAll();
+                }
+            }
+        }
+
+        protected abstract T doHandle() throws Exception;
+
+        @Override
+        public final synchronized void run() throws InterruptedException {
+            while (!completed) {
+                wait();
+            }
+        }
+
+        public T result() throws Exception {
+            if (failed) {
+                if (failure instanceof Error) {
+                    throw (Error) failure;
+                }
+                throw (Exception) failure;
+            }
+            return response;
+        }
+    }
+
+    private class CancelJobRequest extends UnInterruptibleRequest<Void> {
+        final JobId jobId;
+
+        public CancelJobRequest(JobId jobId) {
+            this.jobId = jobId;
+        }
+
+        @Override
+        protected Void doHandle() throws Exception {
+            hci.cancelJob(jobId);
+            return null;
+        }
+
+    }
+
+    private class StartDeployedJobRequest extends 
UnInterruptibleRequest<JobId> {
+
+        private final DeployedJobSpecId deployedJobSpecId;
+        private final Map<byte[], byte[]> jobParameters;
+
+        public StartDeployedJobRequest(DeployedJobSpecId deployedJobSpecId, 
Map<byte[], byte[]> jobParameters) {
+            this.deployedJobSpecId = deployedJobSpecId;
+            this.jobParameters = jobParameters;
+        }
+
+        @Override
+        protected JobId doHandle() throws Exception {
+            return hci.startJob(deployedJobSpecId, jobParameters);
+        }
+
+    }
+
+    private class StartJobRequest extends UnInterruptibleRequest<JobId> {
+        private final DeploymentId deploymentId;
+        private final IActivityClusterGraphGeneratorFactory acggf;
+        private final EnumSet<JobFlag> jobFlags;
+
+        public StartJobRequest(DeploymentId deploymentId, 
IActivityClusterGraphGeneratorFactory acggf,
+                EnumSet<JobFlag> jobFlags) {
+            this.deploymentId = deploymentId;
+            this.acggf = acggf;
+            this.jobFlags = jobFlags;
+        }
+
+        @Override
+        protected JobId doHandle() throws Exception {
+            if (deploymentId == null) {
+                return hci.startJob(JavaSerializationUtils.serialize(acggf), 
jobFlags);
+            } else {
+                return hci.startJob(deploymentId, 
JavaSerializationUtils.serialize(acggf), jobFlags);
+            }
+        }
+
+    }
+
+    private class UninterrubtileRequestHandler implements Runnable {
+
+        @SuppressWarnings({ "squid:S2189", "squid:S2142" })
+        @Override
+        public void run() {
+            while (true) {
+                try {
+                    uninterruptibles.take().handle();
+                } catch (InterruptedException e) {
+                    LOGGER.log(Level.WARN, "Ignoring interrupt. This thread 
should never be interrupted.");
+                    continue;
+                }
+            }
+        }
+
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2639
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I27b2aaae902b19829bd2df2ae04c5e704f5ca8e8
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to