Till Westmann has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1709

Change subject: Enable HTTP API processing on NCs
......................................................................

Enable HTTP API processing on NCs

- Query/Status/Result are answered by NC nodes
- other HTTP requests are proxied to the CC node
- SessionConfig refactoring – split into config and output (SessionOutput)
- TestExecutor now can send http requests do multiple nodes (round robin)

Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
A 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M 
asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
A 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
36 files changed, 1,030 insertions(+), 337 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/09/1709/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index b0f863b..d822439 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -22,7 +22,6 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
@@ -42,14 +41,14 @@
      * Called when the {@code IStatementExecutor} encounters an extension 
statement.
      * An implementation class should implement the actual processing of the 
statement in this method.
      *
-     * @param queryTranslator
-     * @param metadataProvider
      * @param statementExecutor
+     * @param metadataProvider
      * @param hcc
+     * @param outMetadata
      * @param resultSetIdCounter
      * @throws Exception
      */
-    void handle(IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
+    void handle(IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, ResultDelivery resultDelivery, 
IStatementExecutor.ResultMetadata outMetadata,
             int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException;
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 92c487b..4ef2705 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -18,17 +18,24 @@
  */
 package org.apache.asterix.translator;
 
+import java.io.Serializable;
 import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IClusterInfoCollector;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
 /**
@@ -52,6 +59,16 @@
          * A result handle is returned before the resutlts are complete
          */
         ASYNC
+    }
+
+    class ResultMetadata implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final List<Triple<JobId, ResultSetId, ARecordType>> resultSets 
= new ArrayList<>();
+
+        public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
+            return resultSets;
+        }
     }
 
     public static class Stats {
@@ -85,12 +102,14 @@
      *            A Hyracks dataset client object that is used to read the 
results.
      * @param resultDelivery
      *            The {@code ResultDelivery} kind required for queries in the 
list of statements
+     * @param outMetadata
+     *            a reference to write the metadata of executed queries
      * @param stats
      *            a reference to write the stats of executed queries
      * @throws Exception
      */
     void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, 
ResultDelivery resultDelivery,
-            Stats stats) throws Exception;
+            ResultMetadata outMetadata, Stats stats) throws Exception;
 
     /**
      * Compiles and execute a list of statements, with passing in client 
context id and context.
@@ -101,6 +120,8 @@
      *            A Hyracks dataset client object that is used to read the 
results.
      * @param resultDelivery
      *            The {@code ResultDelivery} kind required for queries in the 
list of statements
+     * @param outMetadata
+     *            a reference to write the metadata of executed queries
      * @param stats
      *            a reference to write the stats of executed queries
      * @param clientContextId
@@ -110,7 +131,7 @@
      * @throws Exception
      */
     void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, 
ResultDelivery resultDelivery,
-            Stats stats, String clientContextId, IStatementExecutorContext 
ctx) throws Exception;
+            ResultMetadata outMetadata, Stats stats, String clientContextId, 
IStatementExecutorContext ctx) throws Exception;
 
     /**
      * rewrites and compiles query into a hyracks job specifications
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
index 23365de..b244c0c 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
@@ -37,14 +37,14 @@
      *
      * @param statements
      *            Statements to execute
-     * @param conf
-     *            request configuration
+     * @param output
+     *            output and request configuration
      * @param compilationProvider
      *            provides query language related components
      * @param storageComponentProvider
      *            provides storage related components
      * @return an implementation of {@code IStatementExecutor} thaxt is used 
to execute the passed list of statements
      */
-    IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> 
statements, SessionConfig conf,
+    IStatementExecutor create(ICcApplicationContext appCtx, List<Statement> 
statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, 
IStorageComponentProvider storageComponentProvider);
 }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index a637e2f..cfd4e87 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -18,12 +18,9 @@
  */
 package org.apache.asterix.translator;
 
-import java.io.PrintWriter;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import 
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
 
 /**
  * SessionConfig captures several different parameters for controlling
@@ -38,8 +35,10 @@
  * execution output - LOSSLESS_JSON, CSV, etc.
  * <li>It allows you to specify output format-specific parameters.
  */
+public class SessionConfig implements Serializable {
 
-public class SessionConfig {
+    private static final long serialVersionUID = 1L;
+
     /**
      * Used to specify the output format for the primary execution.
      */
@@ -105,53 +104,25 @@
      */
     public static final String FORMAT_QUOTE_RECORD = "quote-record";
 
-    @FunctionalInterface
-    public interface ResultDecorator {
-        AlgebricksAppendable append(AlgebricksAppendable app) throws 
AlgebricksException;
-    }
-
-    @FunctionalInterface
-    public interface ResultAppender {
-        AlgebricksAppendable append(AlgebricksAppendable app, String str) 
throws AlgebricksException;
-    }
+    // Output format.
+    private final OutputFormat fmt;
 
     // Standard execution flags.
     private final boolean executeQuery;
     private final boolean generateJobSpec;
     private final boolean optimize;
 
-    // Output path for primary execution.
-    private final PrintWriter out;
-
-    // Output format.
-    private final OutputFormat fmt;
-
-    private final ResultDecorator preResultDecorator;
-    private final ResultDecorator postResultDecorator;
-    private final ResultAppender handleAppender;
-    private final ResultAppender statusAppender;
-
     // Flags.
     private final Map<String, Boolean> flags;
 
-    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator 
preResultDecorator,
-            ResultDecorator postResultDecorator, ResultAppender 
handleAppender, ResultAppender statusAppender) {
-        this(out, fmt, preResultDecorator, postResultDecorator, 
handleAppender, statusAppender,
-                true, true, true);
-    }
-
-    public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, 
boolean executeQuery,
-            boolean generateJobSpec) {
-        this(out, fmt, null, null, null, null, optimize, executeQuery, 
generateJobSpec);
+    public SessionConfig(OutputFormat fmt) {
+        this(fmt, true, true, true);
     }
 
     /**
      * Create a SessionConfig object with all optional values set to defaults:
      * - All format flags set to "false".
      * - All out-of-band outputs set to "false".
-     *
-     * @param out
-     *            PrintWriter for execution output.
      * @param fmt
      *            Output format for execution output.
      * @param optimize
@@ -160,17 +131,9 @@
      *            Whether to execute the query or not.
      * @param generateJobSpec
      *            Whether to generate the Hyracks job specification (if
-     *            false, job cannot be executed).
      */
-    public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator 
preResultDecorator,
-            ResultDecorator postResultDecorator, ResultAppender 
handleAppender, ResultAppender statusAppender,
-            boolean optimize, boolean executeQuery, boolean generateJobSpec) {
-        this.out = out;
+    public SessionConfig(OutputFormat fmt, boolean optimize, boolean 
executeQuery, boolean generateJobSpec) {
         this.fmt = fmt;
-        this.preResultDecorator = preResultDecorator;
-        this.postResultDecorator = postResultDecorator;
-        this.handleAppender = handleAppender;
-        this.statusAppender = statusAppender;
         this.optimize = optimize;
         this.executeQuery = executeQuery;
         this.generateJobSpec = generateJobSpec;
@@ -178,33 +141,10 @@
     }
 
     /**
-     * Retrieve the PrintWriter to produce output to.
-     */
-    public PrintWriter out() {
-        return this.out;
-    }
-
-    /**
      * Retrieve the OutputFormat for this execution.
      */
     public OutputFormat fmt() {
         return this.fmt;
-    }
-
-    public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws 
AlgebricksException {
-        return this.preResultDecorator != null ? 
this.preResultDecorator.append(app) : app;
-    }
-
-    public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws 
AlgebricksException {
-        return this.postResultDecorator != null ? 
this.postResultDecorator.append(app) : app;
-    }
-
-    public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String 
handle) throws AlgebricksException {
-        return this.handleAppender != null ? this.handleAppender.append(app, 
handle) : app;
-    }
-
-    public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String 
status) throws AlgebricksException {
-        return this.statusAppender != null ? this.statusAppender.append(app, 
status) : app;
     }
 
     /**
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
new file mode 100644
index 0000000..b559df8
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import java.io.PrintWriter;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import 
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+
+public class SessionOutput {
+    private final SessionConfig config;
+
+    // Output path for primary execution.
+    private final PrintWriter out;
+
+    private final SessionOutput.ResultDecorator preResultDecorator;
+    private final SessionOutput.ResultDecorator postResultDecorator;
+    private final SessionOutput.ResultAppender handleAppender;
+    private final SessionOutput.ResultAppender statusAppender;
+
+    public SessionOutput(SessionConfig config, PrintWriter out) {
+        this(config, out, null, null, null, null);
+    }
+
+    public SessionOutput(SessionConfig config, PrintWriter out, 
ResultDecorator preResultDecorator,
+            ResultDecorator postResultDecorator, ResultAppender 
handleAppender, ResultAppender statusAppender) {
+        this.config = config;
+        this.out = out;
+        this.preResultDecorator = preResultDecorator;
+        this.postResultDecorator = postResultDecorator;
+        this.handleAppender = handleAppender;
+        this.statusAppender = statusAppender;
+    }
+
+    /**
+     * Retrieve the PrintWriter to produce output to.
+     */
+    public PrintWriter out() {
+        return this.out;
+    }
+
+    public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws 
AlgebricksException {
+        return this.preResultDecorator != null ? 
this.preResultDecorator.append(app) : app;
+    }
+
+    public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws 
AlgebricksException {
+        return this.postResultDecorator != null ? 
this.postResultDecorator.append(app) : app;
+    }
+
+    public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String 
handle) throws AlgebricksException {
+        return this.handleAppender != null ? this.handleAppender.append(app, 
handle) : app;
+    }
+
+    public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String 
status) throws AlgebricksException {
+        return this.statusAppender != null ? this.statusAppender.append(app, 
status) : app;
+    }
+
+    public SessionConfig config() {
+        return config;
+    }
+
+    @FunctionalInterface
+    public interface ResultDecorator {
+        AlgebricksAppendable append(AlgebricksAppendable app) throws 
AlgebricksException;
+    }
+
+    @FunctionalInterface
+    public interface ResultAppender {
+        AlgebricksAppendable append(AlgebricksAppendable app, String str) 
throws AlgebricksException;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b12935d..5a66fd6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -67,6 +67,7 @@
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.utils.ResourceUtils;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -152,31 +153,32 @@
         }
     }
 
-    private void printPlanPrefix(SessionConfig conf, String planName) {
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("<h4>" + planName + ":</h4>");
-            conf.out().println("<pre>");
+    private void printPlanPrefix(SessionOutput output, String planName) {
+        if (output.config().is(SessionConfig.FORMAT_HTML)) {
+            output.out().println("<h4>" + planName + ":</h4>");
+            output.out().println("<pre>");
         } else {
-            conf.out().println("----------" + planName + ":");
+            output.out().println("----------" + planName + ":");
         }
     }
 
-    private void printPlanPostfix(SessionConfig conf) {
-        if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("</pre>");
+    private void printPlanPostfix(SessionOutput output) {
+        if (output.config().is(SessionConfig.FORMAT_HTML)) {
+            output.out().println("</pre>");
         }
     }
 
     public Pair<IReturningStatement, Integer> reWriteQuery(List<FunctionDecl> 
declaredFunctions,
-            MetadataProvider metadataProvider, IReturningStatement q, 
SessionConfig conf) throws CompilationException {
+            MetadataProvider metadataProvider, IReturningStatement q, 
SessionOutput output) throws CompilationException {
         if (q == null) {
             return null;
         }
+        SessionConfig conf = output.config();
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && 
conf.is(SessionConfig.OOB_EXPR_TREE)) {
-            conf.out().println();
-            printPlanPrefix(conf, "Expression tree");
-            q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
-            printPlanPostfix(conf);
+            output.out().println();
+            printPlanPrefix(output, "Expression tree");
+            q.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 
0);
+            printPlanPostfix(output);
         }
         IQueryRewriter rw = rewriterFactory.createQueryRewriter();
         rw.rewrite(declaredFunctions, q, metadataProvider, new 
LangRewritingContext(q.getVarCounter()));
@@ -184,17 +186,18 @@
     }
 
     public JobSpecification compileQuery(IClusterInfoCollector 
clusterInfoCollector, MetadataProvider metadataProvider,
-            Query rwQ, int varCounter, String outputDatasetName, SessionConfig 
conf, ICompiledDmlStatement statement)
+            Query rwQ, int varCounter, String outputDatasetName, SessionOutput 
output, ICompiledDmlStatement statement)
             throws AlgebricksException, RemoteException, ACIDException {
 
+        SessionConfig conf = output.config();
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && 
conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
-            conf.out().println();
+            output.out().println();
 
-            printPlanPrefix(conf, "Rewritten expression tree");
+            printPlanPrefix(output, "Rewritten expression tree");
             if (rwQ != null) {
-                
rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
+                
rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
 
         org.apache.asterix.common.transactions.JobId asterixJobId = 
JobIdFactory.generateJobId();
@@ -211,14 +214,14 @@
         }
 
         if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && 
conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
-            conf.out().println();
+            output.out().println();
 
-            printPlanPrefix(conf, "Logical plan");
+            printPlanPrefix(output, "Logical plan");
             if (rwQ != null || (statement != null && statement.getKind() == 
Statement.Kind.LOAD)) {
-                LogicalOperatorPrettyPrintVisitor pvisitor = new 
LogicalOperatorPrettyPrintVisitor(conf.out());
+                LogicalOperatorPrettyPrintVisitor pvisitor = new 
LogicalOperatorPrettyPrintVisitor(output.out());
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
         CompilerProperties compilerProperties = 
metadataProvider.getApplicationContext().getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
@@ -264,15 +267,15 @@
             if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) {
                 if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) {
                     // For Optimizer tests.
-                    AlgebricksAppendable buffer = new 
AlgebricksAppendable(conf.out());
+                    AlgebricksAppendable buffer = new 
AlgebricksAppendable(output.out());
                     PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
                 } else {
-                    printPlanPrefix(conf, "Optimized logical plan");
+                    printPlanPrefix(output, "Optimized logical plan");
                     if (rwQ != null || (statement != null && 
statement.getKind() == Statement.Kind.LOAD)) {
-                        LogicalOperatorPrettyPrintVisitor pvisitor = new 
LogicalOperatorPrettyPrintVisitor(conf.out());
+                        LogicalOperatorPrettyPrintVisitor pvisitor = new 
LogicalOperatorPrettyPrintVisitor(output.out());
                         PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
                     }
-                    printPlanPostfix(conf);
+                    printPlanPostfix(output);
                 }
             }
         }
@@ -280,7 +283,7 @@
             try {
                 LogicalOperatorPrettyPrintVisitor pvisitor = new 
LogicalOperatorPrettyPrintVisitor();
                 PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
-                
ResultUtil.printResults(metadataProvider.getApplicationContext(), 
pvisitor.get().toString(), conf,
+                
ResultUtil.printResults(metadataProvider.getApplicationContext(), 
pvisitor.get().toString(), output,
                         new Stats(), null);
                 return null;
             } catch (IOException e) {
@@ -336,17 +339,17 @@
         }
 
         if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
-            printPlanPrefix(conf, "Hyracks job");
+            printPlanPrefix(output, "Hyracks job");
             if (rwQ != null) {
                 try {
-                    conf.out().println(
+                    output.out().println(
                             new 
ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
                 } catch (IOException e) {
                     throw new AlgebricksException(e);
                 }
-                conf.out().println(spec.getUserConstraints());
+                output.out().println(spec.getUserConstraints());
             }
-            printPlanPostfix(conf);
+            printPlanPostfix(output);
         }
         return spec;
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a9c4b39..a4e72f7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -26,7 +26,7 @@
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -35,7 +35,7 @@
 import org.apache.hyracks.http.server.AbstractServlet;
 
 public class AbstractQueryApiServlet extends AbstractServlet {
-    protected final ICcApplicationContext appCtx;
+    protected final IApplicationContext appCtx;
 
     public enum ResultFields {
         REQUEST_ID("requestID"),
@@ -93,7 +93,7 @@
         }
     }
 
-    AbstractQueryApiServlet(ICcApplicationContext appCtx, 
ConcurrentMap<String, Object> ctx, String[] paths) {
+    AbstractQueryApiServlet(IApplicationContext appCtx, ConcurrentMap<String, 
Object> ctx, String[] paths) {
         super(ctx, paths);
         this.appCtx = appCtx;
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 3384332..7874aa3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -50,6 +50,7 @@
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -137,19 +138,20 @@
             }
             IParser parser = parserFactory.createParser(query);
             List<Statement> aqlStatements = parser.parse();
-            SessionConfig sessionConfig = new SessionConfig(out, format, true, 
isSet(executeQuery), true);
+            SessionConfig sessionConfig = new SessionConfig(format, true, 
isSet(executeQuery), true);
             sessionConfig.set(SessionConfig.FORMAT_HTML, true);
             sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
             sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, 
isSet(wrapperArray));
             sessionConfig.setOOBData(isSet(printExprParam), 
isSet(printRewrittenExprParam),
                     isSet(printLogicalPlanParam), 
isSet(printOptimizedLogicalPlanParam), isSet(printJob));
+            SessionOutput sessionOutput = new SessionOutput(sessionConfig, 
out);
             MetadataManager.INSTANCE.init();
-            IStatementExecutor translator = 
statementExectorFactory.create(appCtx, aqlStatements, sessionConfig,
+            IStatementExecutor translator = 
statementExectorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
             double duration;
             long startTime = System.currentTimeMillis();
             translator.compileAndExecute(hcc, hds, 
IStatementExecutor.ResultDelivery.IMMEDIATE,
-                    new IStatementExecutor.Stats());
+                    null, new IStatementExecutor.Stats());
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(HTML_STATEMENT_SEPARATOR);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 401f55e..42e23ba 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -25,9 +25,9 @@
 
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.dataset.DatasetJobRecord;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -41,7 +41,7 @@
 public class QueryResultApiServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = 
Logger.getLogger(QueryResultApiServlet.class.getName());
 
-    public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx) {
+    public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx) {
         super(appCtx, ctx, paths);
     }
 
@@ -94,8 +94,8 @@
             // way to send the same OutputFormat value here as was
             // originally determined there. Need to save this value on
             // some object that we can obtain here.
-            SessionConfig sessionConfig = RestApiServlet.initResponse(request, 
response);
-            ResultUtil.printResults(appCtx, resultReader, sessionConfig, new 
Stats(), null);
+            SessionOutput sessionOutput = RestApiServlet.initResponse(request, 
response);
+            ResultUtil.printResults(appCtx, resultReader, sessionOutput, new 
Stats(), null);
         } catch (HyracksDataException e) {
             final int errorCode = e.getErrorCode();
             if (ErrorCode.NO_RESULTSET == errorCode) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 20bffc4..97890fd 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -23,22 +23,30 @@
 import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
+import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.aql.parser.TokenMgrError;
 import org.apache.asterix.lang.common.base.IParser;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
@@ -46,8 +54,13 @@
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import 
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -66,19 +79,24 @@
 
 public class QueryServiceServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = 
Logger.getLogger(QueryServiceServlet.class.getName());
+    private final ILangExtension.Language queryLanguage;
     private final ILangCompilationProvider compilationProvider;
     private final IStatementExecutorFactory statementExecutorFactory;
     private final IStorageComponentProvider componentProvider;
-    private final IStatementExecutorContext queryCtx = new 
StatementExecutorContext();
+    private final IStatementExecutorContext queryCtx;
+    private final IServiceContext serviceCtx;
 
-    public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx,
+    public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx,
+            ILangExtension.Language queryLanguage,
             ILangCompilationProvider compilationProvider, 
IStatementExecutorFactory statementExecutorFactory,
             IStorageComponentProvider componentProvider) {
         super(appCtx, ctx, paths);
+        this.queryLanguage = queryLanguage;
         this.compilationProvider = compilationProvider;
         this.statementExecutorFactory = statementExecutorFactory;
         this.componentProvider = componentProvider;
-        ctx.put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
+        this.queryCtx = (IStatementExecutorContext) 
ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+        this.serviceCtx = (IServiceContext) 
ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR);
     }
 
     @Override
@@ -235,40 +253,22 @@
         return SessionConfig.OutputFormat.CLEAN_JSON;
     }
 
-    private static SessionConfig createSessionConfig(RequestParameters param, 
String handleUrl,
+    private static SessionOutput createSessionOutput(RequestParameters param, 
String handleUrl,
             PrintWriter resultWriter) {
-        SessionConfig.ResultDecorator resultPrefix = new 
SessionConfig.ResultDecorator() {
-            int resultNo = -1;
-
-            @Override
-            public AlgebricksAppendable append(AlgebricksAppendable app) 
throws AlgebricksException {
-                app.append("\t\"");
-                app.append(ResultFields.RESULTS.str());
-                if (resultNo >= 0) {
-                    app.append('-').append(String.valueOf(resultNo));
-                }
-                ++resultNo;
-                app.append("\": ");
-                return app;
-            }
-        };
-
-        SessionConfig.ResultDecorator resultPostfix = app -> 
app.append("\t,\n");
-        SessionConfig.ResultAppender appendHandle = (app, handle) -> 
app.append("\t\"")
-                .append(ResultFields.HANDLE.str()).append("\": 
\"").append(handleUrl).append(handle).append("\",\n");
-        SessionConfig.ResultAppender appendStatus = (app, status) -> 
app.append("\t\"")
-                .append(ResultFields.STATUS.str()).append("\": 
\"").append(status).append("\",\n");
+        SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
+        SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
+        SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
+        SessionOutput.ResultAppender appendStatus = 
ResultUtil.createResultStatusAppender();
 
         SessionConfig.OutputFormat format = getFormat(param.format);
-        SessionConfig sessionConfig =
-                new SessionConfig(resultWriter, format, resultPrefix, 
resultPostfix, appendHandle, appendStatus);
+        SessionConfig sessionConfig = new SessionConfig(format);
         sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
         sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
         sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
                 format != SessionConfig.OutputFormat.CLEAN_JSON && format != 
SessionConfig.OutputFormat.LOSSLESS_JSON);
         sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == 
SessionConfig.OutputFormat.CSV
                 && "present".equals(getParameterValue(param.format, 
Attribute.HEADER.str())));
-        return sessionConfig;
+        return new SessionOutput(sessionConfig, resultWriter, resultPrefix, 
resultPostfix, appendHandle, appendStatus);
     }
 
     private static void printClientContextID(PrintWriter pw, RequestParameters 
params) {
@@ -406,7 +406,8 @@
         ResultDelivery delivery = parseResultDelivery(param.mode);
 
         String handleUrl = getHandleUrl(param.host, param.path, delivery);
-        SessionConfig sessionConfig = createSessionConfig(param, handleUrl, 
resultWriter);
+        SessionOutput sessionOutput = createSessionOutput(param, handleUrl, 
resultWriter);
+        SessionConfig sessionConfig = sessionOutput.config();
         HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
 
         HttpResponseStatus status = HttpResponseStatus.OK;
@@ -428,28 +429,77 @@
             if (param.statement == null || param.statement.isEmpty()) {
                 throw new AsterixException("Empty request, no statement 
provided");
             }
-            IParser parser = 
compilationProvider.getParserFactory().createParser(param.statement + ";");
-            List<Statement> statements = parser.parse();
-            MetadataManager.INSTANCE.init();
-            IStatementExecutor translator =
-                    statementExecutorFactory.create(appCtx, statements, 
sessionConfig, compilationProvider,
-                            componentProvider);
-            execStart = System.nanoTime();
-            translator.compileAndExecute(getHyracksClientConnection(), 
getHyracksDataset(), delivery, stats,
-                    param.clientContextID, queryCtx);
-            execEnd = System.nanoTime();
+
+            String statementsText = param.statement + ";";
+
+            if (statementExecutorFactory != null) {
+                // Running on CC -> execute directly
+                IParser parser = 
compilationProvider.getParserFactory().createParser(statementsText);
+                List<Statement> statements = parser.parse();
+                MetadataManager.INSTANCE.init();
+                IStatementExecutor translator = 
statementExecutorFactory.create((ICcApplicationContext) appCtx,
+                        statements, sessionOutput, compilationProvider, 
componentProvider);
+                execStart = System.nanoTime();
+                translator.compileAndExecute(getHyracksClientConnection(), 
getHyracksDataset(), delivery, null,
+                        stats, param.clientContextID, queryCtx);
+            } else {
+                // Running on NC -> send 'execute' message to CC
+                INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
+                INCMessageBroker ncMb = (INCMessageBroker) 
ncCtx.getMessageBroker();
+                UUID ncMessageId = UUID.randomUUID();
+                ResultDelivery ccDelivery =
+                        delivery == ResultDelivery.IMMEDIATE ? 
ResultDelivery.DEFERRED : delivery;
+                ExecuteStatementRequestMessage requestMsg =
+                        new ExecuteStatementRequestMessage(ncCtx.getNodeId(), 
ncMessageId, queryLanguage,
+                                statementsText, sessionConfig, ccDelivery, 
param.clientContextID, handleUrl);
+                ExecuteStatementResponseMessage responseMsg;
+                Future<IMessage> responseFuture = 
ncMb.registerFuture(ncMessageId);
+                try {
+                    execStart = System.nanoTime();
+                    ncMb.sendMessageToCC(requestMsg);
+                    responseMsg = (ExecuteStatementResponseMessage) 
responseFuture.get(
+                            
ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS,
+                            java.util.concurrent.TimeUnit.MILLISECONDS);
+                    execStart = System.nanoTime();
+                } finally {
+                    ncMb.deregisterFuture(ncMessageId);
+                }
+
+                Throwable err = responseMsg.getError();
+                if (err != null) {
+                    if (err instanceof Exception) {
+                        throw (Exception)err;
+                    } else if (err instanceof TokenMgrError) {
+                        throw (TokenMgrError)err;
+                    } else if (err instanceof 
org.apache.asterix.aqlplus.parser.TokenMgrError) {
+                        throw 
(org.apache.asterix.aqlplus.parser.TokenMgrError)err;
+                    } else {
+                        throw new Exception(err.toString());
+                    }
+                }
+
+                IStatementExecutor.ResultMetadata resultMetadata = 
responseMsg.getMetadata();
+                if (delivery == ResultDelivery.IMMEDIATE && 
!resultMetadata.getResultSets().isEmpty()) {
+                    for (Triple<JobId, ResultSetId, ARecordType> rsmd : 
resultMetadata.getResultSets()) {
+                        ResultReader resultReader = new 
ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+                        ResultUtil.printResults(appCtx, resultReader, 
sessionOutput, stats, rsmd.getRight());
+                    }
+                } else {
+                    sessionOutput.out().append(responseMsg.getResult());
+                }
+            }
             if (ResultDelivery.IMMEDIATE == delivery || 
ResultDelivery.DEFERRED == delivery) {
-                ResultUtil.printStatus(sessionConfig, ResultStatus.SUCCESS);
+                ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
             }
         } catch (AsterixException | TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             ResultUtil.printError(resultWriter, pe);
-            ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
             status = HttpResponseStatus.BAD_REQUEST;
         } catch (Exception e) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
             ResultUtil.printError(resultWriter, e);
-            ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+            ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
             status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
         } finally {
             if (execStart == -1) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index d0c574e..71dddc0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -29,7 +29,7 @@
 
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.hyracks.api.dataset.DatasetJobRecord;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -41,7 +41,7 @@
 public class QueryStatusApiServlet extends AbstractQueryApiServlet {
     private static final Logger LOGGER = 
Logger.getLogger(QueryStatusApiServlet.class.getName());
 
-    public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx) {
+    public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx) {
         super(appCtx, ctx, paths);
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index e339ba9..6b1e408 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -45,6 +45,7 @@
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -84,7 +85,7 @@
      * SessionConfig with the appropriate output writer and output-format
      * based on the Accept: header and other servlet parameters.
      */
-    static SessionConfig initResponse(IServletRequest request, 
IServletResponse response) throws IOException {
+    static SessionOutput initResponse(IServletRequest request, 
IServletResponse response) throws IOException {
         HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN, 
HttpUtil.Encoding.UTF8);
         // CLEAN_JSON output is the default; most generally useful for a
         // programmatic HTTP API
@@ -114,9 +115,9 @@
             format = OutputFormat.LOSSLESS_JSON;
         }
 
-        SessionConfig.ResultAppender appendHandle = (app, handle) -> 
app.append("{ \"").append("handle")
+        SessionOutput.ResultAppender appendHandle = (app, handle) -> 
app.append("{ \"").append("handle")
                 .append("\":" + " \"").append(handle).append("\" }");
-        SessionConfig sessionConfig = new SessionConfig(response.writer(), 
format, null, null, appendHandle, null);
+        SessionConfig sessionConfig = new SessionConfig(format);
 
         // If it's JSON or ADM, check for the "wrapper-array" flag. Default is
         // "true" for JSON and "false" for ADM. (Not applicable for CSV.)
@@ -152,7 +153,7 @@
             default:
                 throw new IOException("Unknown format " + format);
         }
-        return sessionConfig;
+        return new SessionOutput(sessionConfig, response.writer(), null, null, 
appendHandle, null);
     }
 
     @Override
@@ -171,9 +172,9 @@
             // enable cross-origin resource sharing
             response.setHeader("Access-Control-Allow-Origin", "*");
             response.setHeader("Access-Control-Allow-Headers", "Origin, 
X-Requested-With, Content-Type, Accept");
-            SessionConfig sessionConfig = initResponse(request, response);
+            SessionOutput sessionOutput = initResponse(request, response);
             QueryTranslator.ResultDelivery resultDelivery = 
whichResultDelivery(request);
-            doHandle(response, query, sessionConfig, resultDelivery);
+            doHandle(response, query, sessionOutput, resultDelivery);
         } catch (Exception e) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             LOGGER.log(Level.WARNING, "Failure handling request", e);
@@ -181,7 +182,7 @@
         }
     }
 
-    private void doHandle(IServletResponse response, String query, 
SessionConfig sessionConfig,
+    private void doHandle(IServletResponse response, String query, 
SessionOutput sessionOutput,
             ResultDelivery resultDelivery) throws JsonProcessingException {
         try {
             response.setStatus(HttpResponseStatus.OK);
@@ -201,20 +202,20 @@
             List<Statement> aqlStatements = parser.parse();
             validate(aqlStatements);
             MetadataManager.INSTANCE.init();
-            IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, aqlStatements, sessionConfig,
+            IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
-            translator.compileAndExecute(hcc, hds, resultDelivery, new 
IStatementExecutor.Stats());
+            translator.compileAndExecute(hcc, hds, resultDelivery, null, new 
IStatementExecutor.Stats());
         } catch (AsterixException | TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
             String errorMessage = ResultUtil.buildParseExceptionMessage(pe, 
query);
             ObjectNode errorResp =
                     ResultUtil.getErrorResponse(2, errorMessage, "", 
ResultUtil.extractFullStackTrace(pe));
-            sessionConfig.out().write(new 
ObjectMapper().writeValueAsString(errorResp));
+            sessionOutput.out().write(new 
ObjectMapper().writeValueAsString(errorResp));
         } catch (Exception e) {
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
-            ResultUtil.apiErrorHandler(sessionConfig.out(), e);
+            ResultUtil.apiErrorHandler(sessionOutput.out(), e);
         }
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index fe6fa89..3bcd670 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -36,10 +36,10 @@
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultPrinter;
 import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.http.ParseException;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import 
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -77,29 +77,29 @@
         return escaped;
     }
 
-    public static void printResults(ICcApplicationContext appCtx, ResultReader 
resultReader, SessionConfig conf,
+    public static void printResults(IApplicationContext appCtx, ResultReader 
resultReader, SessionOutput output,
             Stats stats, ARecordType recordType) throws HyracksDataException {
-        new ResultPrinter(appCtx, conf, stats, recordType).print(resultReader);
+        new ResultPrinter(appCtx, output, stats, 
recordType).print(resultReader);
     }
 
-    public static void printResults(ICcApplicationContext appCtx, String 
record, SessionConfig conf, Stats stats,
+    public static void printResults(IApplicationContext appCtx, String record, 
SessionOutput output, Stats stats,
             ARecordType recordType) throws HyracksDataException {
-        new ResultPrinter(appCtx, conf, stats, recordType).print(record);
+        new ResultPrinter(appCtx, output, stats, recordType).print(record);
     }
 
-    public static void printResultHandle(SessionConfig conf, ResultHandle 
handle) throws HyracksDataException {
+    public static void printResultHandle(SessionOutput output, ResultHandle 
handle) throws HyracksDataException {
         try {
-            final AlgebricksAppendable app = new 
AlgebricksAppendable(conf.out());
-            conf.appendHandle(app, handle.toString());
+            final AlgebricksAppendable app = new 
AlgebricksAppendable(output.out());
+            output.appendHandle(app, handle.toString());
         } catch (AlgebricksException e) {
             LOGGER.warn("error printing handle", e);
         }
     }
 
-    public static void printStatus(SessionConfig conf, 
AbstractQueryApiServlet.ResultStatus rs) {
+    public static void printStatus(SessionOutput output, 
AbstractQueryApiServlet.ResultStatus rs) {
         try {
-            final AlgebricksAppendable app = new 
AlgebricksAppendable(conf.out());
-            conf.appendStatus(app, rs.str());
+            final AlgebricksAppendable app = new 
AlgebricksAppendable(output.out());
+            output.appendStatus(app, rs.str());
         } catch (AlgebricksException e) {
             LOGGER.warn("error printing status", e);
         }
@@ -318,4 +318,35 @@
         return errorTemplate;
     }
 
+    public static SessionOutput.ResultDecorator createPreResultDecorator() {
+        return new SessionOutput.ResultDecorator() {
+            int resultNo = -1;
+
+            @Override
+            public AlgebricksAppendable append(AlgebricksAppendable app) 
throws AlgebricksException {
+                app.append("\t\"");
+                app.append(AbstractQueryApiServlet.ResultFields.RESULTS.str());
+                if (resultNo >= 0) {
+                    app.append('-').append(String.valueOf(resultNo));
+                }
+                ++resultNo;
+                app.append("\": ");
+                return app;
+            }
+        };
+    }
+
+    public static SessionOutput.ResultDecorator createPostResultDecorator() {
+        return app -> app.append("\t,\n");
+    }
+
+    public static SessionOutput.ResultAppender 
createResultHandleAppender(String handleUrl) {
+        return (app, handle) -> 
app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.HANDLE.str())
+                .append("\": 
\"").append(handleUrl).append(handle).append("\",\n");
+    }
+
+    public static SessionOutput.ResultAppender createResultStatusAppender() {
+        return (app, status) -> 
app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.STATUS.str())
+                .append("\": \"").append(status).append("\",\n");
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
index a9d4e22..9e1609d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
@@ -24,6 +24,7 @@
     public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = 
"org.apache.asterix.APP_CONTEXT_INFO";
     public static final String EXECUTOR_SERVICE_ATTR = 
"org.apache.asterix.EXECUTOR_SERVICE_ATTR";
     public static final String RUNNING_QUERIES_ATTR = 
"org.apache.asterix.RUNINNG_QUERIES";
+    public static final String SERVICE_CONTEXT_ATTR = 
"org.apache.asterix.SERVICE_CONTEXT";
 
     private ServletConstants() {
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index ecf2c53..a9d24b9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -36,6 +36,7 @@
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobSpecification;
 
@@ -99,16 +100,17 @@
         List<Statement> statements = parser.parse();
         MetadataManager.INSTANCE.init();
 
-        SessionConfig conf = new SessionConfig(writer, OutputFormat.ADM, 
optimize, true, generateBinaryRuntime);
+        SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize, 
true, generateBinaryRuntime);
         conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan, 
printOptimizedPlan, printJob);
         if (printPhysicalOpsOnly) {
             conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
         }
+        SessionOutput output = new SessionOutput(conf, writer);
 
-        IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, statements, conf, compilationProvider,
+        IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
         translator.compileAndExecute(hcc, null, 
QueryTranslator.ResultDelivery.IMMEDIATE,
-                new IStatementExecutor.Stats());
+                null, new IStatementExecutor.Stats());
         writer.flush();
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 6c6f2af..0c6b2cc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -110,11 +110,11 @@
         return statementExecutorFactory;
     }
 
-    public ILangCompilationProvider getAqlCompilationProvider() {
-        return aqlCompilationProvider;
-    }
-
-    public ILangCompilationProvider getSqlppCompilationProvider() {
-        return sqlppCompilationProvider;
+    public ILangCompilationProvider getCompilationProvider(Language lang) {
+        switch (lang) {
+            case AQL: return aqlCompilationProvider;
+            case SQLPP: return sqlppCompilationProvider;
+            default: throw new IllegalArgumentException(String.valueOf(lang));
+        }
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
new file mode 100644
index 0000000..59d516f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.lang.aql.parser.TokenMgrError;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
+public final class ExecuteStatementRequestMessage implements 
ICcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOGGER = 
Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
+
+    private final String requestNodeId;
+
+    private final UUID requestMessageId;
+
+    private final ILangExtension.Language lang;
+
+    private final String statementsText;
+
+    private final SessionConfig sessionConfig;
+
+    private final IStatementExecutor.ResultDelivery delivery;
+
+    private final String clientContextID;
+
+    private final String handleUrl;
+
+    public ExecuteStatementRequestMessage(String requestNodeId, UUID 
requestMessageId, ILangExtension.Language lang,
+            String statementsText, SessionConfig sessionConfig, 
IStatementExecutor.ResultDelivery delivery,
+            String clientContextID, String handleUrl) {
+        this.requestNodeId = requestNodeId;
+        this.requestMessageId = requestMessageId;
+        this.lang = lang;
+        this.statementsText = statementsText;
+        this.sessionConfig = sessionConfig;
+        this.delivery = delivery;
+        this.clientContextID = clientContextID;
+        this.handleUrl = handleUrl;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext ccAppCtx) throws 
HyracksDataException, InterruptedException {
+        ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext();
+        ClusterControllerService ccSrv = (ClusterControllerService) 
ccSrvContext.getControllerService();
+        CCApplication ccApp = (CCApplication) ccSrv.getApplication();
+        CCMessageBroker messageBroker = (CCMessageBroker) 
ccSrvContext.getMessageBroker();
+        CCExtensionManager ccExtMgr = (CCExtensionManager) 
ccAppCtx.getExtensionManager();
+        ILangCompilationProvider compilationProvider = 
ccExtMgr.getCompilationProvider(lang);
+        IStorageComponentProvider storageComponentProvider = 
ccAppCtx.getStorageComponentProvider();
+        IStatementExecutorFactory statementExecutorFactory = 
ccApp.getStatementExecutorFactory();
+        IStatementExecutorContext statementExecutorContext = 
ccApp.getStatementExecutorContext();
+
+        ccSrv.getExecutor().submit(() -> {
+            ExecuteStatementResponseMessage responseMsg = new 
ExecuteStatementResponseMessage(requestMessageId);
+
+            try {
+                IParser parser = 
compilationProvider.getParserFactory().createParser(statementsText);
+                List<Statement> statements = parser.parse();
+
+                StringWriter outWriter = new StringWriter(256);
+                PrintWriter outPrinter = new PrintWriter(outWriter);
+                SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
+                SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
+                SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
+                SessionOutput.ResultAppender appendStatus = 
ResultUtil.createResultStatusAppender();
+                SessionOutput sessionOutput = new SessionOutput(sessionConfig, 
outPrinter, resultPrefix, resultPostfix,
+                        appendHandle, appendStatus);
+
+                IStatementExecutor.ResultMetadata outMetadata = new 
IStatementExecutor.ResultMetadata();
+
+                MetadataManager.INSTANCE.init();
+                IStatementExecutor translator = 
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+                        compilationProvider, storageComponentProvider);
+                translator.compileAndExecute(ccAppCtx.getHcc(), null, 
delivery, outMetadata,
+                        new IStatementExecutor.Stats(), clientContextID, 
statementExecutorContext);
+
+                outPrinter.close();
+                responseMsg.setResult(outWriter.toString());
+                responseMsg.setMetadata(outMetadata);
+            } catch (TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), 
pe);
+                responseMsg.setError(pe);
+            } catch (AsterixException pe) {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), 
pe);
+                responseMsg.setError(new AsterixException(pe.getMessage()));
+            } catch (Exception e) {
+                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), 
e);
+                responseMsg.setError(new Exception(e.toString()));
+            }
+
+            try {
+                messageBroker.sendApplicationMessageToNC(responseMsg, 
requestNodeId);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARNING, e.getMessage(), e);
+            }
+        });
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s, from=%s): %s", 
getClass().getSimpleName(), requestMessageId, requestNodeId,
+                statementsText);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
new file mode 100644
index 0000000..9bd058a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.messages.IMessage;
+
+public final class ExecuteStatementResponseMessage implements 
INcAddressedMessage {
+    private static final long serialVersionUID = 1L;
+
+    public static final long DEFAULT_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+
+    private final UUID requestMessageId;
+
+    private String result;
+
+    private IStatementExecutor.ResultMetadata metadata;
+
+    private Throwable error;
+
+    public ExecuteStatementResponseMessage(UUID requestMessageId) {
+        this.requestMessageId = requestMessageId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        CompletableFuture<IMessage> result = 
mb.deregisterFuture(requestMessageId);
+        if (result != null) {
+            result.complete(this);
+        }
+    }
+
+    public Throwable getError() {
+        return error;
+    }
+
+    public void setError(Throwable error) {
+        this.error = error;
+    }
+
+    public String getResult() {
+        return result;
+    }
+
+    public void setResult(String result) {
+        this.result = result;
+    }
+
+    public IStatementExecutor.ResultMetadata getMetadata() {
+        return metadata;
+    }
+
+    public void setMetadata(IStatementExecutor.ResultMetadata metadata) {
+        this.metadata = metadata;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s(id=%s): %d characters", 
getClass().getSimpleName(), requestMessageId,
+                result != null ? result.length() : 0);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 7ed3aef..452d13e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -24,10 +24,11 @@
 import java.io.StringWriter;
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import 
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
 import org.apache.hyracks.api.comm.IFrame;
@@ -47,6 +48,7 @@
 
     private final FrameManager resultDisplayFrameMgr;
 
+    private final SessionOutput output;
     private final SessionConfig conf;
     private final Stats stats;
     private final ARecordType recordType;
@@ -62,8 +64,9 @@
     private ObjectMapper om;
     private ObjectWriter ow;
 
-    public ResultPrinter(ICcApplicationContext appCtx, SessionConfig conf, 
Stats stats, ARecordType recordType) {
-        this.conf = conf;
+    public ResultPrinter(IApplicationContext appCtx, SessionOutput output, 
Stats stats, ARecordType recordType) {
+        this.output = output;
+        this.conf = output.config();
         this.stats = stats;
         this.recordType = recordType;
         this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON);
@@ -112,18 +115,18 @@
         // If we're outputting CSV with a header, the HTML header was already
         // output by displayCSVHeader(), so skip it here
         if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("<h4>Results:</h4>");
-            conf.out().println("<pre class=\"result-content\">");
+            output.out().println("<h4>Results:</h4>");
+            output.out().println("<pre class=\"result-content\">");
         }
 
         try {
-            conf.resultPrefix(new AlgebricksAppendable(conf.out()));
+            output.resultPrefix(new AlgebricksAppendable(output.out()));
         } catch (AlgebricksException e) {
             throw new HyracksDataException(e);
         }
 
         if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
-            conf.out().print("[ ");
+            output.out().print("[ ");
             wrapArray = true;
         }
 
@@ -134,29 +137,29 @@
             if (quoteRecord) {
                 StringWriter sw = new StringWriter();
                 appendCSVHeader(sw, recordType);
-                conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
-                conf.out().print("\n");
+                output.out().print(JSONUtil.quoteAndEscape(sw.toString()));
+                output.out().print("\n");
                 notFirst = true;
             } else {
-                appendCSVHeader(conf.out(), recordType);
+                appendCSVHeader(output.out(), recordType);
             }
         }
     }
 
     private void printPostfix() throws HyracksDataException {
-        conf.out().flush();
+        output.out().flush();
         if (wrapArray) {
-            conf.out().println(" ]");
+            output.out().println(" ]");
         }
         try {
-            conf.resultPostfix(new AlgebricksAppendable(conf.out()));
+            output.resultPostfix(new AlgebricksAppendable(output.out()));
         } catch (AlgebricksException e) {
             throw new HyracksDataException(e);
         }
         if (conf.is(SessionConfig.FORMAT_HTML)) {
-            conf.out().println("</pre>");
+            output.out().println("</pre>");
         }
-        conf.out().flush();
+        output.out().flush();
     }
 
     private void displayRecord(String result) throws HyracksDataException {
@@ -177,7 +180,7 @@
             // TODO(tillw): this is inefficient as well
             record = JSONUtil.quoteAndEscape(record);
         }
-        conf.out().print(record);
+        output.out().print(record);
         stats.setCount(stats.getCount() + 1);
         // TODO(tillw) fix this approximation
         stats.setSize(stats.getSize() + record.length());
@@ -211,7 +214,7 @@
                 }
                 String result = new String(frameBytes, start, length, UTF_8);
                 if (wrapArray && notFirst) {
-                    conf.out().print(", ");
+                    output.out().print(", ");
                 }
                 notFirst = true;
                 displayRecord(result);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index e2f17ac..00fb1b0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 
 public class DefaultStatementExecutorFactory implements 
IStatementExecutorFactory {
@@ -48,9 +48,9 @@
     }
 
     @Override
-    public IStatementExecutor create(ICcApplicationContext appCtx, 
List<Statement> statements, SessionConfig conf,
+    public IStatementExecutor create(ICcApplicationContext appCtx, 
List<Statement> statements, SessionOutput output,
             ILangCompilationProvider compilationProvider, 
IStorageComponentProvider storageComponentProvider) {
-        return new QueryTranslator(appCtx, statements, conf, 
compilationProvider, storageComponentProvider,
+        return new QueryTranslator(appCtx, statements, output, 
compilationProvider, storageComponentProvider,
                 executorService);
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e97a7e1..46ece3e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -161,6 +161,7 @@
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.TypeTranslator;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
@@ -170,6 +171,7 @@
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Triple;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -203,6 +205,7 @@
     public static final boolean IS_DEBUG_MODE = false;// true
     protected final List<Statement> statements;
     protected final ICcApplicationContext appCtx;
+    protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
     protected Dataverse activeDataverse;
     protected final List<FunctionDecl> declaredFunctions;
@@ -211,12 +214,13 @@
     protected final IStorageComponentProvider componentProvider;
     protected final ExecutorService executorService;
 
-    public QueryTranslator(ICcApplicationContext appCtx, List<Statement> 
statements, SessionConfig conf,
+    public QueryTranslator(ICcApplicationContext appCtx, List<Statement> 
statements, SessionOutput output,
             ILangCompilationProvider compliationProvider, 
IStorageComponentProvider componentProvider,
             ExecutorService executorService) {
         this.appCtx = appCtx;
         this.statements = statements;
-        this.sessionConfig = conf;
+        this.sessionOutput = output;
+        this.sessionConfig = output.config();
         this.componentProvider = componentProvider;
         declaredFunctions = getDeclaredFunctions(statements);
         apiFramework = new APIFramework(compliationProvider);
@@ -241,13 +245,13 @@
 
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, 
IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats) throws Exception {
-        compileAndExecute(hcc, hdc, resultDelivery, stats, null, null);
+            ResultMetadata outMetadata, Stats stats) throws Exception {
+        compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null, 
null);
     }
 
     @Override
     public void compileAndExecute(IHyracksClientConnection hcc, 
IHyracksDataset hdc, ResultDelivery resultDelivery,
-            Stats stats, String clientContextId, IStatementExecutorContext 
ctx) throws Exception {
+            ResultMetadata outMetadata, Stats stats, String clientContextId, 
IStatementExecutorContext ctx) throws Exception {
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -262,7 +266,7 @@
         try {
             for (Statement stmt : statements) {
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
-                    
sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
+                    
sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
                 }
                 validateOperation(appCtx, activeDataverse, stmt);
                 rewriteStatement(stmt); // Rewrite the statement's AST.
@@ -324,8 +328,8 @@
                             metadataProvider.setResultAsyncMode(resultDelivery 
== ResultDelivery.ASYNC
                                     || resultDelivery == 
ResultDelivery.DEFERRED);
                         }
-                        handleInsertUpsertStatement(metadataProvider, stmt, 
hcc, hdc, resultDelivery, stats, false,
-                                clientContextId, ctx);
+                        handleInsertUpsertStatement(metadataProvider, stmt, 
hcc, hdc, resultDelivery, outMetadata,
+                                stats,false, clientContextId, ctx);
                         break;
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc, 
false);
@@ -358,8 +362,8 @@
                         metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
                         metadataProvider.setResultAsyncMode(
                                 resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.DEFERRED);
-                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, stats, clientContextId,
-                                ctx);
+                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, outMetadata, stats,
+                                clientContextId, ctx);
                         break;
                     case Statement.Kind.COMPACT:
                         handleCompactStatement(metadataProvider, stmt, hcc);
@@ -379,8 +383,8 @@
                         // No op
                         break;
                     case Statement.Kind.EXTENSION:
-                        ((IExtensionStatement) stmt).handle(this, 
metadataProvider, hcc, hdc, resultDelivery, stats,
-                                resultSetIdCounter);
+                        ((IExtensionStatement) stmt).handle(this, 
metadataProvider, hcc, hdc, resultDelivery,
+                                outMetadata, resultSetIdCounter);
                         break;
                     default:
                         throw new CompilationException("Unknown function");
@@ -1694,7 +1698,7 @@
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, 
loadStmt.getDatasetName().getValue(),
                             loadStmt.getAdapter(), loadStmt.getProperties(), 
loadStmt.dataIsAlreadySorted());
-            JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionConfig, cls);
+            JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionOutput, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             if (spec != null) {
@@ -1712,8 +1716,8 @@
 
     public JobSpecification handleInsertUpsertStatement(MetadataProvider 
metadataProvider, Statement stmt,
             IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery,
-            IStatementExecutor.Stats stats, boolean compileOnly, String 
clientContextId, IStatementExecutorContext ctx)
-            throws Exception {
+            ResultMetadata outMetadata, Stats stats, boolean compileOnly, 
String clientContextId,
+            IStatementExecutorContext ctx) throws Exception {
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtInsertUpsert.getDataverseName());
         final IMetadataLocker locker = new IMetadataLocker() {
@@ -1755,7 +1759,8 @@
         }
 
         if (stmtInsertUpsert.getReturnExpression() != null) {
-            deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, stats, clientContextId, ctx);
+            deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats,
+                    clientContextId, ctx);
         } else {
             locker.lock();
             try {
@@ -1811,11 +1816,11 @@
 
         // Query Rewriting (happens under the same ongoing metadata 
transaction)
         Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
query, sessionConfig);
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
query, sessionOutput);
 
         // Query Compilation (happens under the same ongoing metadata 
transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, (Query) rewrittenResult.first,
-                rewrittenResult.second, stmt == null ? null : 
stmt.getDatasetName(), sessionConfig, stmt);
+                rewrittenResult.second, stmt == null ? null : 
stmt.getDatasetName(), sessionOutput, stmt);
     }
 
     private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector 
clusterInfoCollector,
@@ -1824,7 +1829,7 @@
 
         // Insert/upsert statement rewriting (happens under the same ongoing 
metadata transaction)
         Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
insertUpsert, sessionConfig);
+                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
insertUpsert, sessionOutput);
 
         InsertStatement rewrittenInsertUpsert = (InsertStatement) 
rewrittenResult.first;
         String dataverseName = 
getActiveDataverse(rewrittenInsertUpsert.getDataverseName());
@@ -1846,7 +1851,7 @@
         }
         // Insert/upsert statement compilation (happens under the same ongoing 
metadata transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, rewrittenInsertUpsert.getQuery(),
-                rewrittenResult.second, datasetName, sessionConfig, clfrqs);
+                rewrittenResult.second, datasetName, sessionOutput, clfrqs);
     }
 
     protected void handleCreateFeedStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
@@ -2052,7 +2057,7 @@
                 datasets.add(ds);
             }
             org.apache.commons.lang3.tuple.Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> jobInfo =
-                    FeedOperations.buildStartFeedJob(sessionConfig, 
metadataProvider, feed, feedConnections,
+                    FeedOperations.buildStartFeedJob(sessionOutput, 
metadataProvider, feed, feedConnections,
                             compilationProvider, storageComponentProvider, 
qtFactory, hcc);
 
             JobSpecification feedJob = jobInfo.getLeft();
@@ -2287,8 +2292,8 @@
     }
 
     protected void handleQuery(MetadataProvider metadataProvider, Query query, 
IHyracksClientConnection hcc,
-            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats, 
String clientContextId,
-            IStatementExecutorContext ctx) throws Exception {
+            IHyracksDataset hdc, ResultDelivery resultDelivery, ResultMetadata 
outMetadata, Stats stats,
+            String clientContextId, IStatementExecutorContext ctx) throws 
Exception {
         final IMetadataLocker locker = new IMetadataLocker() {
             @Override
             public void lock() {
@@ -2318,12 +2323,12 @@
                 throw e;
             }
         };
-        deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, stats, clientContextId, ctx);
+        deliverResult(hcc, hdc, compiler, metadataProvider, locker, 
resultDelivery, outMetadata, stats, clientContextId, ctx);
     }
 
     private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset 
hdc, IStatementCompiler compiler,
-            MetadataProvider metadataProvider, IMetadataLocker locker, 
ResultDelivery resultDelivery, Stats stats,
-            String clientContextId, IStatementExecutorContext ctx) throws 
Exception {
+            MetadataProvider metadataProvider, IMetadataLocker locker, 
ResultDelivery resultDelivery, ResultMetadata outMetadata,
+            Stats stats, String clientContextId, IStatementExecutorContext 
ctx) throws Exception {
         final ResultSetId resultSetId = metadataProvider.getResultSetId();
         switch (resultDelivery) {
             case ASYNC:
@@ -2339,13 +2344,17 @@
             case IMMEDIATE:
                 createAndRunJob(hcc, null, compiler, locker, resultDelivery, 
id -> {
                     final ResultReader resultReader = new ResultReader(hdc, 
id, resultSetId);
-                    ResultUtil.printResults(appCtx, resultReader, 
sessionConfig, stats,
-                            metadataProvider.findOutputRecordType());
+                    ARecordType recordType = 
metadataProvider.findOutputRecordType();
+                    ResultUtil.printResults(appCtx, resultReader, 
sessionOutput, stats, recordType);
                 }, clientContextId, ctx);
                 break;
             case DEFERRED:
                 createAndRunJob(hcc, null, compiler, locker, resultDelivery, 
id -> {
-                    ResultUtil.printResultHandle(sessionConfig, new 
ResultHandle(id, resultSetId));
+                    ResultHandle resultHandle = new ResultHandle(id, 
resultSetId);
+                    ResultUtil.printResultHandle(sessionOutput, resultHandle);
+                    if (outMetadata != null) {
+                        outMetadata.getResultSets().add(Triple.of(id, 
resultSetId, metadataProvider.findOutputRecordType()));
+                    }
                 }, clientContextId, ctx);
                 break;
             default:
@@ -2360,8 +2369,8 @@
         try {
             createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id 
-> {
                 final ResultHandle handle = new ResultHandle(id, resultSetId);
-                ResultUtil.printStatus(sessionConfig, 
AbstractQueryApiServlet.ResultStatus.RUNNING);
-                ResultUtil.printResultHandle(sessionConfig, handle);
+                ResultUtil.printStatus(sessionOutput, 
AbstractQueryApiServlet.ResultStatus.RUNNING);
+                ResultUtil.printResultHandle(sessionOutput, handle);
                 synchronized (printed) {
                     printed.setTrue();
                     printed.notify();
@@ -2370,8 +2379,8 @@
         } catch (Exception e) {
             if (JobId.INVALID.equals(jobId.getValue())) {
                 // compilation failed
-                ResultUtil.printStatus(sessionConfig, 
AbstractQueryApiServlet.ResultStatus.FAILED);
-                ResultUtil.printError(sessionConfig.out(), e);
+                ResultUtil.printStatus(sessionOutput, 
AbstractQueryApiServlet.ResultStatus.FAILED);
+                ResultUtil.printError(sessionOutput.out(), e);
             } else {
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
                         resultDelivery.name() + " job with id " + 
jobId.getValue() + " " + "failed", e);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 9c66f57..87a237d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -19,17 +19,19 @@
 
 package org.apache.asterix.hyracks.bootstrap;
 
+import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
+import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
 import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
@@ -73,6 +75,7 @@
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -95,6 +98,7 @@
     protected ICCServiceContext ccServiceCtx;
     protected CCExtensionManager ccExtensionManager;
     protected IStorageComponentProvider componentProvider;
+    protected StatementExecutorContext statementExecutorCtx;
     protected WebManager webManager;
     protected CcApplicationContext appCtx;
     private IJobCapacityController jobCapacityController;
@@ -124,9 +128,10 @@
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager.instantiate((HyracksConnection) getHcc(), 
componentProvider);
+        statementExecutorCtx = new StatementExecutorContext();
         appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, resourceIdManager,
                 () -> MetadataManager.INSTANCE, 
GlobalRecoveryManager.instance(), ftStrategy,
-                new ActiveLifecycleListener());
+                new ActiveLifecycleListener(), componentProvider);
         ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
         ccExtensionManager = new CCExtensionManager(getExtensions());
         appCtx.setExtensionManager(ccExtensionManager);
@@ -184,7 +189,7 @@
         IHyracksClientConnection hcc = getHcc();
         webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { 
"/*" }, appCtx,
-                ccExtensionManager.getAqlCompilationProvider(), 
ccExtensionManager.getSqlppCompilationProvider(),
+                ccExtensionManager.getCompilationProvider(AQL), 
ccExtensionManager.getCompilationProvider(SQLPP),
                 getStatementExecutorFactory(), componentProvider));
         return webServer;
     }
@@ -197,6 +202,8 @@
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
                 ccServiceCtx.getControllerService().getExecutor());
+        jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, 
statementExecutorCtx);
+        jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, 
ccServiceCtx);
 
         // AQL rest APIs.
         addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -241,28 +248,28 @@
     protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String 
key, String... paths) {
         switch (key) {
             case Servlets.AQL:
-                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_QUERY:
-                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_UPDATE:
-                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.AQL_DDL:
-                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getAqlCompilationProvider(),
+                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(AQL),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP:
-                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new FullApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_QUERY:
-                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new QueryApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_UPDATE:
-                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new UpdateApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.SQLPP_DDL:
-                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new DdlApiServlet(ctx, paths, appCtx, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.RUNNING_REQUESTS:
                 return new QueryCancellationServlet(ctx, paths);
@@ -271,7 +278,7 @@
             case Servlets.QUERY_RESULT:
                 return new QueryResultApiServlet(ctx, paths, appCtx);
             case Servlets.QUERY_SERVICE:
-                return new QueryServiceServlet(ctx, paths, appCtx, 
ccExtensionManager.getSqlppCompilationProvider(),
+                return new QueryServiceServlet(ctx, paths, appCtx, SQLPP, 
ccExtensionManager.getCompilationProvider(SQLPP),
                         getStatementExecutorFactory(), componentProvider);
             case Servlets.CONNECTOR:
                 return new ConnectorApiServlet(ctx, paths, appCtx);
@@ -292,13 +299,17 @@
         }
     }
 
-    private IStatementExecutorFactory getStatementExecutorFactory() {
+    public IStatementExecutorFactory getStatementExecutorFactory() {
         return 
ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
+    }
+
+    public IStatementExecutorContext getStatementExecutorContext() {
+        return statementExecutorCtx;
     }
 
     @Override
     public void startupCompleted() throws Exception {
-        ccServiceCtx.getControllerService().getExecutor().submit((Callable) () 
-> {
+        ccServiceCtx.getControllerService().getExecutor().submit(() -> {
             ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
             
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
             return null;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 9c24acf..c7b354e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -49,21 +49,26 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.BaseNCApplication;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.server.WebManager;
 
 public class NCApplication extends BaseNCApplication {
     private static final Logger LOGGER = 
Logger.getLogger(NCApplication.class.getName());
 
-    private INCServiceContext ncServiceCtx;
+    protected INCServiceContext ncServiceCtx;
     private INcApplicationContext runtimeContext;
     private String nodeId;
     private boolean stopInitiated = false;
     private SystemState systemState;
+    protected WebManager webManager;
 
     @Override
     public void registerConfig(IConfigManager configManager) {
@@ -122,6 +127,8 @@
             
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
+        webManager = new WebManager();
+
         performLocalCleanUp();
     }
 
@@ -129,6 +136,9 @@
     protected void configureLoggingLevel(Level level) {
         super.configureLoggingLevel(level);
         Logger.getLogger("org.apache.asterix").setLevel(level);
+    }
+
+    protected void configureServers() throws Exception {
     }
 
     protected List<AsterixExtension> getExtensions() {
@@ -143,6 +153,9 @@
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("Stopping Asterix node controller: " + nodeId);
             }
+
+            webManager.stop();
+
             //Clean any temporary files
             performLocalCleanUp();
 
@@ -163,6 +176,10 @@
 
     @Override
     public void startupCompleted() throws Exception {
+        // configure servlets after joining the cluster, so we can create 
HyracksClientConnection
+        configureServers();
+        webManager.start();
+
         // Since we don't pass initial run flag in 
AsterixHyracksIntegrationUtil, we use the virtualNC flag
         final NodeProperties nodeProperties = 
runtimeContext.getNodeProperties();
         if (systemState == SystemState.PERMANENT_DATA_LOSS
@@ -262,4 +279,10 @@
     public INcApplicationContext getApplicationContext() {
         return runtimeContext;
     }
+
+    protected IHyracksClientConnection getHcc() throws Exception {
+        NodeControllerService ncSrv = (NodeControllerService) 
ncServiceCtx.getControllerService();
+        ClusterControllerInfo ccInfo = 
ncSrv.getNodeParameters().getClusterControllerInfo();
+        return new HyracksConnection(ccInfo.getClientNetAddress(), 
ccInfo.getClientNetPort());
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 630aabe..5e9bcc0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -20,6 +20,10 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -44,6 +48,7 @@
     private final LinkedBlockingQueue<INcAddressedMessage> receivedMsgsQ;
     private final ConcurrentFramePool messagingFramePool;
     private final int maxMsgSize;
+    private final Map<UUID, CompletableFuture<IMessage>> futureMap;
 
     public NCMessageBroker(NodeControllerService ncs, MessagingProperties 
messagingProperties) {
         this.ncs = ncs;
@@ -53,6 +58,7 @@
         messagingFramePool = new ConcurrentFramePool(ncs.getId(), 
messagingMemoryBudget,
                 messagingProperties.getFrameSize());
         receivedMsgsQ = new LinkedBlockingQueue<>();
+        futureMap = new HashMap<>();
         MessageDeliveryService msgDeliverySvc = new MessageDeliveryService();
         appContext.getThreadExecutor().execute(msgDeliverySvc);
     }
@@ -104,6 +110,32 @@
         ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
     }
 
+    @Override
+    public CompletableFuture<IMessage> registerFuture(UUID msgId) {
+        if (msgId == null) {
+            throw new NullPointerException();
+        }
+
+        synchronized (futureMap) {
+            if (futureMap.containsKey(msgId)) {
+                throw new IllegalArgumentException(String.valueOf(msgId));
+            }
+            CompletableFuture<IMessage> future = new CompletableFuture<>();
+            futureMap.put(msgId, future);
+            return future;
+        }
+    }
+
+    @Override
+    public CompletableFuture<IMessage> deregisterFuture(UUID msgId) {
+        if (msgId == null) {
+            throw new NullPointerException();
+        }
+        synchronized (futureMap) {
+            return futureMap.remove(msgId);
+        }
+    }
+
     private class MessageDeliveryService implements Runnable {
         /*
          * TODO Currently this thread is not stopped when it is interrupted 
because
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index bffaeef..d1ff871 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -69,7 +69,7 @@
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.translator.CompiledStatements;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -151,7 +151,7 @@
         return spec;
     }
 
-    private static JobSpecification getConnectionJob(SessionConfig 
sessionConfig, MetadataProvider metadataProvider,
+    private static JobSpecification getConnectionJob(SessionOutput 
sessionOutput, MetadataProvider metadataProvider,
             FeedConnection feedConnection, String[] locations, 
ILangCompilationProvider compilationProvider,
             IStorageComponentProvider storageComponentProvider, 
DefaultStatementExecutorFactory qtFactory,
             IHyracksClientConnection hcc) throws AlgebricksException, 
RemoteException, ACIDException {
@@ -165,7 +165,7 @@
         statements.add(dataverseDecl);
         statements.add(subscribeStmt);
         IStatementExecutor translator = 
qtFactory.create(metadataProvider.getApplicationContext(), statements,
-                sessionConfig, compilationProvider, storageComponentProvider);
+                sessionOutput, compilationProvider, storageComponentProvider);
         // configure the metadata provider
         
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + 
Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, 
"" + subscribeStmt.getPolicy());
@@ -357,7 +357,7 @@
     }
 
     public static Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
-            SessionConfig sessionConfig, MetadataProvider metadataProvider, 
Feed feed,
+            SessionOutput sessionOutput, MetadataProvider metadataProvider, 
Feed feed,
             List<FeedConnection> feedConnections, ILangCompilationProvider 
compilationProvider,
             IStorageComponentProvider storageComponentProvider, 
DefaultStatementExecutorFactory qtFactory,
             IHyracksClientConnection hcc) throws Exception {
@@ -372,7 +372,7 @@
         String[] ingestionLocations = 
ingestionAdaptorFactory.getPartitionConstraint().getLocations();
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
-            JobSpecification connectionJob = getConnectionJob(sessionConfig, 
metadataProvider, feedConnection,
+            JobSpecification connectionJob = getConnectionJob(sessionOutput, 
metadataProvider, feedConnection,
                     ingestionLocations, compilationProvider, 
storageComponentProvider, qtFactory, hcc);
             jobsList.add(connectionJob);
         }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index e2885b3..5e3da5f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -39,7 +39,7 @@
 import org.apache.asterix.lang.common.statement.RunStatement;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -51,7 +51,7 @@
     @Test
     public void test() throws Exception {
         List<Statement> statements = new ArrayList<>();
-        SessionConfig mockSessionConfig = mock(SessionConfig.class);
+        SessionOutput mockSessionOutput = mock(SessionOutput.class);
         RunStatement mockRunStatement = mock(RunStatement.class);
 
         // Mocks AppContextInfo.
@@ -70,7 +70,7 @@
         when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
 
         IStatementExecutor aqlTranslator = new 
DefaultStatementExecutorFactory().create(mockAsterixAppContextInfo,
-                statements, mockSessionConfig, new AqlCompilationProvider(), 
new StorageComponentProvider());
+                statements, mockSessionOutput, new AqlCompilationProvider(), 
new StorageComponentProvider());
         List<String> parameters = new ArrayList<>();
         parameters.add("examples/pregelix-example-jar-with-dependencies.jar");
         parameters.add("org.apache.pregelix.example.PageRankVertex");
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 8457681..02b7d20 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -32,11 +32,13 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.Inet4Address;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -117,17 +119,24 @@
     /*
      * Instance members
      */
-    protected final String host;
-    protected final int port;
+    protected final List<InetSocketAddress> endpoints;
+    protected int endpointSelector;
     protected ITestLibrarian librarian;
-
-    public TestExecutor(String host, int port) {
-        this.host = host;
-        this.port = port;
-    }
 
     public TestExecutor() {
         this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
+    }
+
+    public TestExecutor(String host, int port) {
+        this(InetSocketAddress.createUnresolved(host, port));
+    }
+
+    public TestExecutor(InetSocketAddress endpoint) {
+        this(Collections.singletonList(endpoint));
+    }
+
+    public TestExecutor(List<InetSocketAddress> endpoints) {
+        this.endpoints = endpoints;
     }
 
     public void setLibrarian(ITestLibrarian librarian) {
@@ -367,7 +376,7 @@
                     continue;
                 }
                 throw new Exception(
-                        "Result for " + scriptFile + ": expected pattern '" + 
expression + "' not found in result.");
+                        "Result for " + scriptFile + ": expected pattern '" + 
expression + "' not found in result: "+actual);
             }
         } catch (Exception e) {
             System.err.println("Actual results file: " + 
actualFile.toString());
@@ -1122,7 +1131,7 @@
 
     protected InputStream executeHttp(String ctxType, String endpoint, 
OutputFormat fmt) throws Exception {
         String[] split = endpoint.split("\\?");
-        URI uri = new URI("http", null, host, port, split[0], split.length > 1 
? split[1] : null, null);
+        URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] : 
null);
         return executeURI(ctxType, uri, fmt);
     }
 
@@ -1141,7 +1150,7 @@
         //get node process id
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, 
host, port, endpoint, null, null));
+        InputStream executeJSONGet = executeJSONGet(fmt, 
createEndpointURI(endpoint, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1158,7 +1167,7 @@
     private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws 
Exception {
         OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
         String endpoint = "/admin/cluster/node/" + nodeId + "/config";
-        InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://"; + 
host + ":" + port + endpoint));
+        InputStream executeJSONGet = executeJSONGet(fmt, 
createEndpointURI(endpoint, null));
         StringWriter actual = new StringWriter();
         IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
         String config = actual.toString();
@@ -1237,8 +1246,16 @@
                         + "_" + cUnit.getName() + "_qbc.adm");
     }
 
+    protected URI createEndpointURI(String path, String query) throws 
URISyntaxException {
+        int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+        InetSocketAddress endpoint = endpoints.get(endpointIdx);
+        URI uri = new URI("http", null, endpoint.getHostString(), 
endpoint.getPort(), path, query, null);
+        LOGGER.fine("Created endpoint URI: " + uri);
+        return uri;
+    }
+
     protected URI getEndpoint(String servlet) throws URISyntaxException {
-        return new URI("http", null, host, port, 
getPath(servlet).replaceAll("/\\*$", ""), null, null);
+        return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), 
null);
     }
 
     public static String stripJavaComments(String text) {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
new file mode 100644
index 0000000..9a2ad3c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -0,0 +1,52 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-group name="async-deferred">
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-failed">
+            <output-dir compare="Text">async-failed</output-dir>
+            <expected-error>Injected failure in 
asterix:inject-failure</expected-error>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-compilation-failed">
+            <output-dir compare="Text">async-compilation-failed</output-dir>
+            <expected-error>Cannot find dataset gargel</expected-error>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="deferred">
+            <output-dir compare="Text">deferred</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async">
+            <output-dir compare="Text">async</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-repeated">
+            <output-dir compare="Text">async-repeated</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="async-deferred">
+        <compilation-unit name="async-running">
+            <output-dir compare="Text">async-running</output-dir>
+        </compilation-unit>
+    </test-case>
+</test-group>
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index c2b0951..e9de123 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -18,43 +18,10 @@
  !-->
 <!DOCTYPE test-suite [
   <!ENTITY RecordsQueries SYSTEM "queries_sqlpp/objects/ObjectsQueries.xml">
-
+  <!ENTITY AsyncDeferredQueries SYSTEM 
"queries_sqlpp/async-deferred/AsyncDeferredQueries.xml">
 ]>
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" 
QueryFileExtension=".sqlpp">
-  <test-group name="async-deferred">
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-failed">
-        <output-dir compare="Text">async-failed</output-dir>
-        <expected-error>Injected failure in 
asterix:inject-failure</expected-error>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-compilation-failed">
-        <output-dir compare="Text">async-compilation-failed</output-dir>
-        <expected-error>Cannot find dataset gargel</expected-error>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="deferred">
-        <output-dir compare="Text">deferred</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async">
-        <output-dir compare="Text">async</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-repeated">
-        <output-dir compare="Text">async-repeated</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="async-deferred">
-      <compilation-unit name="async-running">
-        <output-dir compare="Text">async-running</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
+  &AsyncDeferredQueries;
   <test-group name="flwor">
     <test-case FilePath="flwor">
       <compilation-unit name="at00">
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index a9e6448..4bc73bd 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,6 +20,7 @@
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -73,4 +74,8 @@
     public IHyracksClientConnection getHcc();
 
     public IResourceIdManager getResourceIdManager();
+
+    public IStorageComponentProvider getStorageComponentProvider();
+
+    public Object getExtensionManager();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index e1101b3..58dc48b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.common.messaging.api;
 
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.messages.IMessageBroker;
 
 public interface INCMessageBroker extends IMessageBroker {
@@ -26,7 +30,6 @@
      * Sends application message from this NC to the CC.
      *
      * @param message
-     * @param callback
      * @throws Exception
      */
     public void sendMessageToCC(ICcAddressedMessage message) throws Exception;
@@ -35,7 +38,6 @@
      * Sends application message from this NC to another NC.
      *
      * @param message
-     * @param callback
      * @throws Exception
      */
     public void sendMessageToNC(String nodeId, INcAddressedMessage message)
@@ -47,4 +49,21 @@
      * @param msg
      */
     public void queueReceivedMessage(INcAddressedMessage msg);
+
+    /**
+     * Creates and registers a Future for a message that will be send through 
this broker
+     * @param msgId message id
+     * @return new Future
+     * @throws NullPointerException if {@code msdId} is {@code null}
+     * @throws IllegalArgumentException if there is already a Future 
associated with given message id
+     */
+    CompletableFuture<IMessage> registerFuture(UUID msgId);
+
+    /**
+     * Removes a previously registered Future
+     * @param msgId message id
+     * @return existing Future or {@code null} if there was no Future 
associated with this message id
+     * @throws NullPointerException if {@code msdId} is {@code null}
+     */
+    CompletableFuture<IMessage> deregisterFuture(UUID msgId);
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 8608d68..a4c271c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -54,6 +55,7 @@
 public class CcApplicationContext implements ICcApplicationContext {
 
     private ICCServiceContext ccServiceCtx;
+    private IStorageComponentProvider storageComponentProvider;
     private IGlobalRecoveryManager globalRecoveryManager;
     private ILibraryManager libraryManager;
     private IResourceIdManager resourceIdManager;
@@ -77,7 +79,8 @@
     public CcApplicationContext(ICCServiceContext ccServiceCtx, 
IHyracksClientConnection hcc,
             ILibraryManager libraryManager, IResourceIdManager 
resourceIdManager,
             Supplier<IMetadataBootstrap> metadataBootstrapSupplier, 
IGlobalRecoveryManager globalRecoveryManager,
-            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener 
activeLifeCycleListener)
+            IFaultToleranceStrategy ftStrategy, IJobLifecycleListener 
activeLifeCycleListener,
+            IStorageComponentProvider storageComponentProvider)
             throws AsterixException, IOException {
         this.ccServiceCtx = ccServiceCtx;
         this.hcc = hcc;
@@ -102,6 +105,7 @@
         this.nodeProperties = new NodeProperties(propertiesAccessor);
         this.metadataBootstrapSupplier = metadataBootstrapSupplier;
         this.globalRecoveryManager = globalRecoveryManager;
+        this.storageComponentProvider = storageComponentProvider;
     }
 
     @Override
@@ -174,6 +178,7 @@
         return libraryManager;
     }
 
+    @Override
     public Object getExtensionManager() {
         return extensionManager;
     }
@@ -213,4 +218,9 @@
     public IJobLifecycleListener getActiveLifecycleListener() {
         return activeLifeCycleListener;
     }
+
+    @Override
+    public IStorageComponentProvider getStorageComponentProvider() {
+        return storageComponentProvider;
+    }
 }
diff --git 
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
 
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index f34dc6a..ebd945e 100644
--- 
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++ 
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -20,14 +20,29 @@
 
 import java.io.File;
 
+import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import javax.xml.transform.sax.SAXSource;
+
+import org.xml.sax.InputSource;
 
 public class TestSuiteParser {
     public TestSuiteParser() {
     }
 
     public org.apache.asterix.testframework.xml.TestSuite parse(File 
testSuiteCatalog) throws Exception {
+        SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+        saxParserFactory.setNamespaceAware(true);
+        saxParserFactory.setXIncludeAware(true);
+        SAXParser saxParser = saxParserFactory.newSAXParser();
+        saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
+
         JAXBContext ctx = 
JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
-        return (org.apache.asterix.testframework.xml.TestSuite) 
ctx.createUnmarshaller().unmarshal(testSuiteCatalog);
+        Unmarshaller um = ctx.createUnmarshaller();
+        return (org.apache.asterix.testframework.xml.TestSuite) 
um.unmarshal(new SAXSource(saxParser.getXMLReader(),
+                new InputSource(testSuiteCatalog.toURI().toString())));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index cdfc492..3000a75 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.http.server;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -47,6 +48,8 @@
     // Constants
     private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
     private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
+    static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK =
+            new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, 
HIGH_WRITE_BUFFER_WATER_MARK);
     private static final Logger LOGGER = 
Logger.getLogger(HttpServer.class.getName());
     private static final int FAILED = -1;
     private static final int STOPPED = 0;
@@ -61,6 +64,7 @@
     private final EventLoopGroup bossGroup;
     private final EventLoopGroup workerGroup;
     private final int port;
+    private final InetSocketAddress proxyToAddress;
     private final ExecutorService executor;
     // Mutable members
     private volatile int state = STOPPED;
@@ -73,9 +77,15 @@
 
     public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
int port, int numExecutorThreads,
             int requestQueueSize) {
+        this(bossGroup, workerGroup, port, numExecutorThreads, 
requestQueueSize, null);
+    }
+
+    public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup, 
int port, int numExecutorThreads,
+            int requestQueueSize, InetSocketAddress proxyToAddress) {
         this.bossGroup = bossGroup;
         this.workerGroup = workerGroup;
         this.port = port;
+        this.proxyToAddress = proxyToAddress;
         ctx = new ConcurrentHashMap<>();
         servlets = new ArrayList<>();
         executor = new ThreadPoolExecutor(numExecutorThreads, 
numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
@@ -192,8 +202,7 @@
         Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() - 
l1.getPaths()[0].length());
         ServerBootstrap b = new ServerBootstrap();
         b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
-                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
-                        new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK, 
HIGH_WRITE_BUFFER_WATER_MARK))
+                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
WRITE_BUFFER_WATER_MARK)
                 .handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new 
HttpServerInitializer(this));
         channel = b.bind(port).sync().channel();
     }
@@ -225,7 +234,6 @@
                 }
             }
         }
-        LOGGER.warning("No servlet for " + uri);
         return null;
     }
 
@@ -257,4 +265,12 @@
     public ExecutorService getExecutor() {
         return executor;
     }
+
+    public EventLoopGroup getWorkerGroup() {
+        return workerGroup;
+    }
+
+    InetSocketAddress getProxyToAddress() {
+        return proxyToAddress;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 6f7ccba..de267ba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.http.server;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -27,12 +28,20 @@
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.server.utils.HttpUtil;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpRequestEncoder;
+import io.netty.handler.codec.http.HttpResponseDecoder;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
 
 public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
 
@@ -64,19 +73,33 @@
         FullHttpRequest request = (FullHttpRequest) msg;
         try {
             IServlet servlet = server.getServlet(request);
-            if (servlet == null) {
-                respond(ctx, request, HttpResponseStatus.NOT_FOUND);
-            } else {
+            if (servlet != null) {
                 submit(ctx, servlet, request);
+                return;
             }
+
+            InetSocketAddress proxyToAddress = server.getProxyToAddress();
+            if (proxyToAddress != null) {
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("Proxy " + request.uri() + " to " + 
proxyToAddress);
+                }
+                proxy(ctx, request, proxyToAddress);
+                return;
+            }
+
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("No servlet for " + request.uri());
+            }
+            respond(ctx, request.protocolVersion(), 
HttpResponseStatus.NOT_FOUND);
+
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failure Submitting HTTP Request", e);
-            respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            respond(ctx, request.protocolVersion(), 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
         }
     }
 
-    private void respond(ChannelHandlerContext ctx, FullHttpRequest request, 
HttpResponseStatus status) {
-        DefaultHttpResponse response = new 
DefaultHttpResponse(request.protocolVersion(), status);
+    private void respond(ChannelHandlerContext ctx, HttpVersion httpVersion, 
HttpResponseStatus status) {
+        DefaultHttpResponse response = new DefaultHttpResponse(httpVersion, 
status);
         ctx.write(response).addListener(ChannelFutureListener.CLOSE);
     }
 
@@ -86,7 +109,7 @@
             servletRequest = HttpUtil.toServletRequest(request);
         } catch (IllegalArgumentException e) {
             LOGGER.log(Level.WARNING, "Failure Decoding Request", e);
-            respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
+            respond(ctx, request.protocolVersion(), 
HttpResponseStatus.BAD_REQUEST);
             return;
         }
         handler = new HttpRequestHandler(ctx, servlet, servletRequest, 
chunkSize);
@@ -102,6 +125,52 @@
         }
     }
 
+    private void proxy(ChannelHandlerContext ctx, FullHttpRequest request, 
InetSocketAddress toAddress) {
+        HttpVersion requestHttpVersion = request.protocolVersion();
+        Channel clientChannel = ctx.channel();
+
+        Bootstrap b = new Bootstrap();
+        b.group(server.getWorkerGroup());
+        b.channel(clientChannel.getClass());
+        b.option(ChannelOption.WRITE_BUFFER_WATER_MARK, 
HttpServer.WRITE_BUFFER_WATER_MARK);
+        b.option(ChannelOption.AUTO_READ, false);
+        b.handler(new ChannelInitializer<Channel>() {
+            @Override
+            protected void initChannel(Channel proxyChannel) throws Exception {
+                ChannelPipeline p = proxyChannel.pipeline();
+                p.addLast(new HttpRequestEncoder());
+                p.addLast(new HttpResponseDecoder());
+                p.addLast(new ProxyHandler(clientChannel));
+            }
+        });
+
+        request.retain();
+
+        b.connect(toAddress).addListener((ChannelFutureListener) 
proxyConnectFuture -> {
+            if (proxyConnectFuture.isSuccess()) {
+                proxyConnectFuture.channel().writeAndFlush(request)
+                        .addListener((ChannelFutureListener) proxyWriteFuture 
-> {
+                            Channel proxyChannel = proxyWriteFuture.channel();
+                            if (proxyWriteFuture.isSuccess()) {
+                                proxyChannel.read();
+                            } else {
+                                proxyChannel.close();
+                                if (LOGGER.isLoggable(Level.FINE)) {
+                                    LOGGER.fine("[Proxy] Failure writing to " 
+ toAddress);
+                                }
+                                respond(ctx, requestHttpVersion, 
HttpResponseStatus.NOT_FOUND);
+                            }
+                        });
+            } else {
+                request.release();
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine("[Proxy] Failure connecting to " + toAddress);
+                }
+                respond(ctx, requestHttpVersion, HttpResponseStatus.NOT_FOUND);
+            }
+        });
+    }
+
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
new file mode 100644
index 0000000..6abe6a7
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.http.server;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+final class ProxyHandler extends ChannelInboundHandlerAdapter {
+
+    private static final Logger LOGGER = 
Logger.getLogger(ProxyHandler.class.getName());
+
+    private final Channel clientChannel;
+
+    ProxyHandler(Channel clientChannel) {
+        this.clientChannel = clientChannel;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext proxyCtx, Object msg) throws 
Exception {
+        clientChannel.writeAndFlush(msg).addListener((ChannelFutureListener) 
clientWriteFuture -> {
+            if (clientWriteFuture.isSuccess()) {
+                proxyCtx.channel().read();
+            } else {
+                clientChannel.close();
+            }
+        });
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext proxyCtx) throws 
Exception {
+        closeClientChannel();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext proxyCtx, Throwable 
cause) throws Exception {
+        LOGGER.log(Level.FINE, "[Proxy] Failure in proxy channel", cause);
+        proxyCtx.channel().close();
+        closeClientChannel();
+    }
+
+    private void closeClientChannel() {
+        if (clientChannel.isActive()) {
+            
clientChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1709
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>

Reply via email to