Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1867

Change subject: [ASTERIXDB-1973][OTR] Make IExtensionStatement#handle extensible
......................................................................

[ASTERIXDB-1973][OTR] Make IExtensionStatement#handle extensible

- user model changes: no
- storage format changes: no
- interface changes:
Introduce IRequestContexts to encapsulate request parameters
and use it in IStatementExecutor#compileAndExecute and
IExtensionStatement#handle.

Details:
- Introduce IRequestContext and its default implementation.
- Make handling IExtensionStatement in IStatementExecutor extensible.
- Include Http Headers in IRequestContext since they might be needed
by extensions to pass some information to extension statements
(e.g authentication headers).

Change-Id: Ie918f4d3f8dae41d07536041c591c59946a077f4
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
A 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
12 files changed, 460 insertions(+), 262 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/67/1867/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index b0f863b..29d5a58 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -20,12 +20,9 @@
 
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestContext;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -42,14 +39,13 @@
      * Called when the {@code IStatementExecutor} encounters an extension 
statement.
      * An implementation class should implement the actual processing of the 
statement in this method.
      *
-     * @param queryTranslator
-     * @param metadataProvider
+     * @param requestContext
      * @param statementExecutor
-     * @param hcc
-     * @param resultSetIdCounter
-     * @throws Exception
+     * @param metadataProvider
+     * @param resultSetId
+     * @throws HyracksDataException
+     * @throws AlgebricksException
      */
-    void handle(IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider,
-            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery 
resultDelivery, Stats stats,
-            int resultSetIdCounter) throws HyracksDataException, 
AlgebricksException;
-}
+    void handle(IRequestContext requestContext, IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
+            int resultSetId) throws HyracksDataException, AlgebricksException;
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
new file mode 100644
index 0000000..0665420
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.util.Map;
+
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+
+public interface IRequestContext {
+
+    /**
+     * @return A Hyracks client connection that is used to submit a jobspec to 
Hyracks.
+     */
+    IHyracksClientConnection getHyracksClientConnection();
+
+    /**
+     * @return A Hyracks dataset client object that is used to read the 
results.
+     */
+    IHyracksDataset getHyracksDataset();
+
+    /**
+     * @return The {@code ResultDelivery} kind required for queries in the 
list of statements
+     */
+    IStatementExecutor.ResultDelivery getResultDelivery();
+
+    /**
+     * @return a reference to write the stats of executed queries
+     */
+    Stats getStats();
+
+    /**
+     * @return a reference to write the metadata of executed queries
+     */
+    IStatementExecutor.ResultMetadata getOutMetadata();
+
+    /**
+     * @return the client context id for the query
+     */
+    String getClientContextId();
+
+    /**
+     * @return a reference for the statement executor for this request
+     */
+    IStatementExecutorContext getStatementExecutorContext();
+
+    /**
+     * @return The headers of the HTTP request if the request is coming from 
the web. Otherwise null.
+     */
+    Map<String, String> getHttpHeaders();
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 19f0dcc..7ec5aa9 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -32,8 +32,6 @@
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IClusterInfoCollector;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -94,45 +92,11 @@
     }
 
     /**
-     * Compiles and execute a list of statements, without passing in client 
context id and context.
-     *
-     * @param hcc
-     *            A Hyracks client connection that is used to submit a jobspec 
to Hyracks.
-     * @param hdc
-     *            A Hyracks dataset client object that is used to read the 
results.
-     * @param resultDelivery
-     *            The {@code ResultDelivery} kind required for queries in the 
list of statements
-     * @param outMetadata
-     *            a reference to write the metadata of executed queries
-     * @param stats
-     *            a reference to write the stats of executed queries
+     * Compiles and executes a list of statements
+     * @param requestContext
      * @throws Exception
      */
-    void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, 
ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats) throws Exception;
-
-    /**
-     * Compiles and execute a list of statements, with passing in client 
context id and context.
-     *
-     * @param hcc
-     *            A Hyracks client connection that is used to submit a jobspec 
to Hyracks.
-     * @param hdc
-     *            A Hyracks dataset client object that is used to read the 
results.
-     * @param resultDelivery
-     *            The {@code ResultDelivery} kind required for queries in the 
list of statements
-     * @param outMetadata
-     *            a reference to write the metadata of executed queries
-     * @param stats
-     *            a reference to write the stats of executed queries
-     * @param clientContextId
-     *            the client context id for the query
-     * @param ctx
-     *            the context that contains the meta information for all 
queries
-     * @throws Exception
-     */
-    void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc, 
ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, String clientContextId, 
IStatementExecutorContext ctx)
-            throws Exception;
+    void compileAndExecute(IRequestContext requestContext) throws Exception;
 
     /**
      * rewrites and compiles query into a hyracks job specifications
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
new file mode 100644
index 0000000..442dd66
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.ctx;
+
+import java.util.Map;
+
+import org.apache.asterix.translator.IRequestContext;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+
+public class RequestContext implements IRequestContext {
+
+    private final IHyracksClientConnection hcc;
+    private final IHyracksDataset hdc;
+    private final ResultDelivery resultDelivery;
+    private final Stats stats;
+    private final Map<String, String> httpHeaders;
+    private final IStatementExecutor.ResultMetadata outMetadata;
+    private final String clientContextId;
+    private final IStatementExecutorContext ctx;
+
+    public RequestContext(Map<String, String> httpHeaders, 
IHyracksClientConnection hcc, IHyracksDataset hdc,
+            ResultDelivery resultDelivery, Stats stats, 
IStatementExecutor.ResultMetadata outMetadata,
+            String clientContextId, IStatementExecutorContext ctx) {
+        this.httpHeaders = httpHeaders;
+        this.hcc = hcc;
+        this.hdc = hdc;
+        this.resultDelivery = resultDelivery;
+        this.stats = stats;
+        this.outMetadata = outMetadata;
+        this.clientContextId = clientContextId;
+        this.ctx = ctx;
+    }
+
+    @Override
+    public IHyracksClientConnection getHyracksClientConnection() {
+        return hcc;
+    }
+
+    @Override
+    public IHyracksDataset getHyracksDataset() {
+        return hdc;
+    }
+
+    @Override
+    public IStatementExecutor.ResultDelivery getResultDelivery() {
+        return resultDelivery;
+    }
+
+    @Override
+    public IStatementExecutor.Stats getStats() {
+        return stats;
+    }
+
+    @Override
+    public IStatementExecutorContext getStatementExecutorContext() {
+        return ctx;
+    }
+
+    @Override
+    public Map<String, String> getHttpHeaders() {
+        return httpHeaders;
+    }
+
+    @Override
+    public IStatementExecutor.ResultMetadata getOutMetadata() {
+        return outMetadata;
+    }
+
+    @Override
+    public String getClientContextId() {
+        return clientContextId;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7874aa3..864cad9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -35,6 +35,7 @@
 
 import javax.imageio.ImageIO;
 
+import org.apache.asterix.api.http.ctx.RequestContext;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -46,6 +47,7 @@
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IRequestContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
@@ -150,8 +152,9 @@
                     compilationProvider, componentProvider);
             double duration;
             long startTime = System.currentTimeMillis();
-            translator.compileAndExecute(hcc, hds, 
IStatementExecutor.ResultDelivery.IMMEDIATE,
-                    null, new IStatementExecutor.Stats());
+            IRequestContext requestContext = new 
RequestContext(HttpUtil.getRequestHeaders(request), hcc, hds,
+                    IStatementExecutor.ResultDelivery.IMMEDIATE, new 
IStatementExecutor.Stats(), null, null, null);
+            translator.compileAndExecute(requestContext);
             long endTime = System.currentTimeMillis();
             duration = (endTime - startTime) / 1000.00;
             out.println(HTML_STATEMENT_SEPARATOR);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 9547514..136dd4a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -19,11 +19,11 @@
 
 package org.apache.asterix.api.http.server;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
 import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
@@ -41,6 +41,8 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 /**
  * Query service servlet that can run on NC nodes.
  * Delegates query execution to CC, then serves the result.
@@ -52,7 +54,7 @@
     }
 
     @Override
-    protected void executeStatement(String statementsText, SessionOutput 
sessionOutput,
+    protected void executeStatement(Map<String, String> httpHeaders, String 
statementsText, SessionOutput sessionOutput,
             IStatementExecutor.ResultDelivery delivery, 
IStatementExecutor.Stats stats, RequestParameters param,
             String handleUrl, long[] outExecStartEnd) throws Exception {
         // Running on NC -> send 'execute' message to CC
@@ -64,8 +66,9 @@
         MessageFuture responseFuture = ncMb.registerMessageFuture();
         try {
             ExecuteStatementRequestMessage requestMsg =
-                    new ExecuteStatementRequestMessage(ncCtx.getNodeId(), 
responseFuture.getFutureId(), queryLanguage,
-                            statementsText, sessionOutput.config(), 
ccDelivery, param.clientContextID, handleUrl);
+                    new ExecuteStatementRequestMessage(httpHeaders, 
ncCtx.getNodeId(), responseFuture.getFutureId(),
+                            queryLanguage, statementsText, 
sessionOutput.config(), ccDelivery, param.clientContextID,
+                            handleUrl);
             outExecStartEnd[0] = System.nanoTime();
             ncMb.sendMessageToCC(requestMsg);
             responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 9ee064e..c3ed2f3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -22,11 +22,13 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.ctx.RequestContext;
 import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
@@ -40,6 +42,7 @@
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IRequestContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -90,7 +93,7 @@
     @Override
     protected void post(IServletRequest request, IServletResponse response) {
         try {
-            handleRequest(getRequestParameters(request), response);
+            handleRequest(request, response);
         } catch (IOException e) {
             // Servlet methods should not throw exceptions
             // http://cwe.mitre.org/data/definitions/600.html
@@ -387,7 +390,8 @@
         return "http://"; + host + path + handlePath(delivery);
     }
 
-    private void handleRequest(RequestParameters param, IServletResponse 
response) throws IOException {
+    private void handleRequest(IServletRequest request, IServletResponse 
response) throws IOException {
+        RequestParameters param = getRequestParameters(request);
         LOGGER.info(param.toString());
         long elapsedStart = System.nanoTime();
         final StringWriter stringWriter = new StringWriter();
@@ -415,7 +419,8 @@
                 throw new AsterixException("Empty request, no statement 
provided");
             }
             String statementsText = param.statement + ";";
-            executeStatement(statementsText, sessionOutput, delivery, stats, 
param, handleUrl, execStartEnd);
+            executeStatement(HttpUtil.getRequestHeaders(request), 
statementsText, sessionOutput, delivery, stats, param,
+                    handleUrl, execStartEnd);
             if (ResultDelivery.IMMEDIATE == delivery || 
ResultDelivery.DEFERRED == delivery) {
                 ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
             }
@@ -446,8 +451,9 @@
         }
     }
 
-    protected void executeStatement(String statementsText, SessionOutput 
sessionOutput, ResultDelivery delivery,
-            IStatementExecutor.Stats stats, RequestParameters param, String 
handleUrl, long[] outExecStartEnd)
+    protected void executeStatement(Map<String, String> httpHeaders, String 
statementsText, SessionOutput sessionOutput,
+            ResultDelivery delivery, IStatementExecutor.Stats stats, 
RequestParameters param, String handleUrl,
+            long[] outExecStartEnd)
             throws Exception {
         IClusterManagementWork.ClusterState clusterState = 
ClusterStateManager.INSTANCE.getState();
         if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -460,8 +466,10 @@
         IStatementExecutor translator = 
statementExecutorFactory.create((ICcApplicationContext) appCtx, statements,
                 sessionOutput, compilationProvider, componentProvider);
         outExecStartEnd[0] = System.nanoTime();
-        translator.compileAndExecute(getHyracksClientConnection(), 
getHyracksDataset(), delivery, null, stats,
-                param.clientContextID, queryCtx);
+        IRequestContext requestContext =
+                new RequestContext(httpHeaders, getHyracksClientConnection(), 
getHyracksDataset(), delivery, stats,
+                        null, param.clientContextID, queryCtx);
+        translator.compileAndExecute(requestContext);
         outExecStartEnd[1] = System.nanoTime();
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 18aae8e..61ee159 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -27,6 +27,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.api.http.ctx.RequestContext;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -39,6 +40,7 @@
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IRequestContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -173,7 +175,7 @@
             response.setHeader("Access-Control-Allow-Headers", "Origin, 
X-Requested-With, Content-Type, Accept");
             SessionOutput sessionOutput = initResponse(request, response);
             QueryTranslator.ResultDelivery resultDelivery = 
whichResultDelivery(request);
-            doHandle(response, query, sessionOutput, resultDelivery);
+            doHandle(request, response, query, sessionOutput, resultDelivery);
         } catch (Exception e) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             LOGGER.log(Level.WARNING, "Failure handling request", e);
@@ -181,7 +183,7 @@
         }
     }
 
-    private void doHandle(IServletResponse response, String query, 
SessionOutput sessionOutput,
+    private void doHandle(IServletRequest request, IServletResponse response, 
String query, SessionOutput sessionOutput,
             ResultDelivery resultDelivery) throws JsonProcessingException {
         try {
             response.setStatus(HttpResponseStatus.OK);
@@ -203,7 +205,9 @@
             MetadataManager.INSTANCE.init();
             IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
                     compilationProvider, componentProvider);
-            translator.compileAndExecute(hcc, hds, resultDelivery, null, new 
IStatementExecutor.Stats());
+            IRequestContext requestContext = new 
RequestContext(HttpUtil.getRequestHeaders(request), hcc, hds,
+                    IStatementExecutor.ResultDelivery.IMMEDIATE, new 
IStatementExecutor.Stats(), null, null, null);
+            translator.compileAndExecute(requestContext);
         } catch (AsterixException | TokenMgrError | 
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
             response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
             GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index a9d24b9..262fb62 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -23,7 +23,7 @@
 import java.util.List;
 
 import org.apache.asterix.api.common.APIFramework;
-import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.api.http.ctx.RequestContext;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.utils.Job;
@@ -32,6 +32,7 @@
 import org.apache.asterix.lang.common.base.IParserFactory;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IRequestContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.SessionConfig;
@@ -109,8 +110,10 @@
 
         IStatementExecutor translator = 
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
                 storageComponentProvider);
-        translator.compileAndExecute(hcc, null, 
QueryTranslator.ResultDelivery.IMMEDIATE,
-                null, new IStatementExecutor.Stats());
+        IRequestContext requestContext =
+                new RequestContext(null, hcc, null, 
IStatementExecutor.ResultDelivery.IMMEDIATE,
+                        new IStatementExecutor.Stats(), null, null, null);
+        translator.compileAndExecute(requestContext);
         writer.flush();
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index fc0c1ff..ef5024e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -22,10 +22,12 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.ctx.RequestContext;
 import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.common.api.IClusterManagementWork;
@@ -41,6 +43,7 @@
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IRequestContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -73,9 +76,11 @@
 
     private final String handleUrl;
 
-    public ExecuteStatementRequestMessage(String requestNodeId, long 
requestMessageId, ILangExtension.Language lang,
-            String statementsText, SessionConfig sessionConfig, 
IStatementExecutor.ResultDelivery delivery,
-            String clientContextID, String handleUrl) {
+    private final Map<String, String> httpHeaders;
+
+    public ExecuteStatementRequestMessage(Map<String, String> httpHeaders, 
String requestNodeId, long requestMessageId,
+            ILangExtension.Language lang, String statementsText, SessionConfig 
sessionConfig,
+            IStatementExecutor.ResultDelivery delivery, String 
clientContextID, String handleUrl) {
         this.requestNodeId = requestNodeId;
         this.requestMessageId = requestMessageId;
         this.lang = lang;
@@ -84,6 +89,7 @@
         this.delivery = delivery;
         this.clientContextID = clientContextID;
         this.handleUrl = handleUrl;
+        this.httpHeaders = httpHeaders;
     }
 
     @Override
@@ -122,11 +128,11 @@
                 IStatementExecutor.ResultMetadata outMetadata = new 
IStatementExecutor.ResultMetadata();
 
                 MetadataManager.INSTANCE.init();
-                IStatementExecutor translator = 
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
-                        compilationProvider, storageComponentProvider);
-                translator.compileAndExecute(ccAppCtx.getHcc(), null, 
delivery, outMetadata,
-                        new IStatementExecutor.Stats(), clientContextID, 
statementExecutorContext);
-
+                IStatementExecutor translator = statementExecutorFactory
+                        .create(ccAppCtx, statements, sessionOutput, 
compilationProvider, storageComponentProvider);
+                IRequestContext requestContext = new 
RequestContext(httpHeaders, ccAppCtx.getHcc(), null, delivery,
+                        new IStatementExecutor.Stats(), outMetadata, 
clientContextID, statementExecutorContext);
+                translator.compileAndExecute(requestContext);
                 outPrinter.close();
                 responseMsg.setResult(outWriter.toString());
                 responseMsg.setMetadata(outMetadata);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2967a38..98c1596 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -164,6 +164,7 @@
 import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.asterix.translator.IRequestContext;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
@@ -254,15 +255,7 @@
     }
 
     @Override
-    public void compileAndExecute(IHyracksClientConnection hcc, 
IHyracksDataset hdc, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats) throws Exception {
-        compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null, 
null);
-    }
-
-    @Override
-    public void compileAndExecute(IHyracksClientConnection hcc, 
IHyracksDataset hdc, ResultDelivery resultDelivery,
-            ResultMetadata outMetadata, Stats stats, String clientContextId, 
IStatementExecutorContext ctx)
-            throws Exception {
+    public void compileAndExecute(IRequestContext requestContext) throws 
Exception {
         int resultSetIdCounter = 0;
         FileSplit outputFile = null;
         IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -274,6 +267,13 @@
          */
         String threadName = Thread.currentThread().getName();
         Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
+        final IHyracksClientConnection hcc = 
requestContext.getHyracksClientConnection();
+        final IHyracksDataset hdc = requestContext.getHyracksDataset();
+        final ResultDelivery resultDelivery = 
requestContext.getResultDelivery();
+        final Stats stats = requestContext.getStats();
+        final ResultMetadata outMetadata = requestContext.getOutMetadata();
+        final String clientContextId = requestContext.getClientContextId();
+        final IStatementExecutorContext ctx = 
requestContext.getStatementExecutorContext();
         try {
             for (Statement stmt : statements) {
                 if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
@@ -394,8 +394,7 @@
                         // No op
                         break;
                     case Statement.Kind.EXTENSION:
-                        ((IExtensionStatement) stmt).handle(this, 
metadataProvider, hcc, hdc, resultDelivery, stats,
-                                resultSetIdCounter);
+                        handleExtensionMessage(stmt, requestContext, 
metadataProvider, resultSetIdCounter);
                         break;
                     default:
                         throw new CompilationException("Unknown function");
@@ -404,6 +403,12 @@
         } finally {
             Thread.currentThread().setName(threadName);
         }
+    }
+
+    protected void handleExtensionMessage(Statement stmt, IRequestContext 
requestContext,
+            MetadataProvider metadataProvider, int resultSetIdCounter)
+            throws HyracksDataException, AlgebricksException {
+        ((IExtensionStatement) stmt).handle(requestContext, this, 
metadataProvider, resultSetIdCounter);
     }
 
     protected void handleSetStatement(Statement stmt, Map<String, String> 
config) {
@@ -448,7 +453,6 @@
     }
 
     protected void handleCreateDataverseStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
-
         CreateDataverseStatement stmtCreateDataverse = 
(CreateDataverseStatement) stmt;
         String dvName = stmtCreateDataverse.getDataverseName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
@@ -479,8 +483,8 @@
     protected static void validateCompactionPolicy(String compactionPolicy,
             Map<String, String> compactionPolicyProperties, 
MetadataTransactionContext mdTxnCtx,
             boolean isExternalDataset) throws CompilationException, Exception {
-        CompactionPolicy compactionPolicyEntity = 
MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
-                MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
+        CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE
+                .getCompactionPolicy(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
         if (compactionPolicyEntity == null) {
             throw new CompilationException("Unknown compaction policy: " + 
compactionPolicy);
         }
@@ -529,10 +533,11 @@
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
-                itemTypeDataverseName, itemTypeDataverseName + "." + 
itemTypeName, metaItemTypeDataverseName,
-                metaItemTypeDataverseName + "." + metaItemTypeName, 
nodegroupName, compactionPolicy,
-                dataverseName + "." + datasetName, defaultCompactionPolicy);
+        MetadataLockManager.INSTANCE
+                .createDatasetBegin(metadataProvider.getLocks(), 
dataverseName, itemTypeDataverseName,
+                        itemTypeDataverseName + "." + itemTypeName, 
metaItemTypeDataverseName,
+                        metaItemTypeDataverseName + "." + metaItemTypeName, 
nodegroupName, compactionPolicy,
+                        dataverseName + "." + datasetName, 
defaultCompactionPolicy);
         Dataset dataset = null;
         try {
             IDatasetDetails datasetDetails = null;
@@ -545,13 +550,14 @@
                     throw new AlgebricksException("A dataset with this name " 
+ datasetName + " already exists.");
                 }
             }
-            Datatype dt = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                    itemTypeDataverseName, itemTypeName);
+            Datatype dt = MetadataManager.INSTANCE
+                    .getDatatype(metadataProvider.getMetadataTxnContext(), 
itemTypeDataverseName, itemTypeName);
             if (dt == null) {
                 throw new AlgebricksException(": type " + itemTypeName + " 
could not be found.");
             }
-            String ngName = ngNameId != null ? ngNameId.getValue()
-                    : configureNodegroupForDataset(appCtx, dd.getHints(), 
dataverseName, datasetName, metadataProvider);
+            String ngName = ngNameId != null ?
+                    ngNameId.getValue() :
+                    configureNodegroupForDataset(appCtx, dd.getHints(), 
dataverseName, datasetName, metadataProvider);
 
             if (compactionPolicy == null) {
                 compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -581,8 +587,9 @@
                             ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getKeySourceIndicators();
                     boolean autogenerated = ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).isAutogenerated();
                     ARecordType aRecordType = (ARecordType) itemType;
-                    List<IAType> partitioningTypes = 
ValidateUtil.validatePartitioningExpressions(aRecordType,
-                            metaRecType, partitioningExprs, 
keySourceIndicators, autogenerated);
+                    List<IAType> partitioningTypes = ValidateUtil
+                            .validatePartitioningExpressions(aRecordType, 
metaRecType, partitioningExprs,
+                                    keySourceIndicators, autogenerated);
 
                     List<String> filterField = ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getFilterField();
                     if (filterField != null) {
@@ -674,14 +681,15 @@
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                            datasetName);
+                    MetadataManager.INSTANCE
+                            
.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, 
datasetName);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent 
state: pending dataset(" + dataverseName
-                            + "." + datasetName + ") couldn't be removed from 
the metadata", e);
+                    throw new IllegalStateException(
+                            "System is inconsistent state: pending dataset(" + 
dataverseName + "." + datasetName
+                                    + ") couldn't be removed from the 
metadata", e);
                 }
             }
             throw e;
@@ -705,8 +713,9 @@
             }
         }
         if (builder != null) {
-            throw new CompilationException("Dataset " + 
dataset.getDataverseName() + "." + dataset.getDatasetName()
-                    + " is currently being " + "fed into by the following 
active entities.\n" + builder.toString());
+            throw new CompilationException(
+                    "Dataset " + dataset.getDataverseName() + "." + 
dataset.getDatasetName() + " is currently being "
+                            + "fed into by the following active entities.\n" + 
builder.toString());
         }
     }
 
@@ -744,8 +753,8 @@
         List<Integer> keySourceIndicators = 
stmtCreateIndex.getFieldSourceIndicators();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
+        MetadataLockManager.INSTANCE
+                .createIndexBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + datasetName);
         String indexName = null;
         Dataset ds = null;
         // For external datasets
@@ -758,8 +767,8 @@
             }
 
             indexName = stmtCreateIndex.getIndexName().getValue();
-            index = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                    datasetName, indexName);
+            index = MetadataManager.INSTANCE
+                    .getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName, indexName);
             if (index != null) {
                 if (stmtCreateIndex.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -768,13 +777,15 @@
                     throw new AlgebricksException("An index with this name " + 
indexName + " already exists.");
                 }
             }
-            Datatype dt = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                    ds.getItemTypeDataverseName(), ds.getItemTypeName());
+            Datatype dt = MetadataManager.INSTANCE
+                    .getDatatype(metadataProvider.getMetadataTxnContext(), 
ds.getItemTypeDataverseName(),
+                            ds.getItemTypeName());
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
             ARecordType metaRecordType = null;
             if (ds.hasMetaPart()) {
-                Datatype metaDt = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                        ds.getMetaItemTypeDataverseName(), 
ds.getMetaItemTypeName());
+                Datatype metaDt = MetadataManager.INSTANCE
+                        .getDatatype(metadataProvider.getMetadataTxnContext(), 
ds.getMetaItemTypeDataverseName(),
+                                ds.getMetaItemTypeName());
                 metaRecordType = (ARecordType) metaDt.getDatatype();
             }
 
@@ -846,17 +857,19 @@
 
                     // If it is not a fixed length
                     if (typeTrait.getFixedLength() < 0) {
-                        throw new AlgebricksException("The keyword or ngram 
index -" + indexName
-                                + " cannot be created on the dataset -" + 
datasetName
-                                + " due to its variable-length primary key 
field - " + partitioningKey);
+                        throw new AlgebricksException(
+                                "The keyword or ngram index -" + indexName + " 
cannot be created on the dataset -"
+                                        + datasetName + " due to its 
variable-length primary key field - "
+                                        + partitioningKey);
                     }
 
                 }
             }
 
-            Index newIndex = new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(),
-                    indexFields, keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(),
-                    overridesFieldTypes, stmtCreateIndex.isEnforced(), false, 
MetadataUtil.PENDING_ADD_OP);
+            Index newIndex =
+                    new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(), indexFields,
+                            keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(), overridesFieldTypes,
+                            stmtCreateIndex.isEnforced(), false, 
MetadataUtil.PENDING_ADD_OP);
             doCreateIndex(hcc, metadataProvider, ds, newIndex, jobFlags);
         } finally {
             metadataProvider.getLocks().unlock();
@@ -892,18 +905,19 @@
                 }
 
                 // Check if the files index exist
-                filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                        index.getDataverseName(), index.getDatasetName(),
-                        
IndexingConstants.getFilesIndexName(index.getDatasetName()));
+                filesIndex = MetadataManager.INSTANCE
+                        .getIndex(metadataProvider.getMetadataTxnContext(), 
index.getDataverseName(),
+                                index.getDatasetName(), 
IndexingConstants.getFilesIndexName(index.getDatasetName()));
                 firstExternalDatasetIndex = filesIndex == null;
                 // Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, 
firstExternalDatasetIndex);
                 datasetLocked = true;
                 if (firstExternalDatasetIndex) {
                     // Verify that no one has created an index before we 
acquire the lock
-                    filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
-                            index.getDataverseName(), index.getDatasetName(),
-                            
IndexingConstants.getFilesIndexName(index.getDatasetName()));
+                    filesIndex = MetadataManager.INSTANCE
+                            
.getIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+                                    index.getDatasetName(),
+                                    
IndexingConstants.getFilesIndexName(index.getDatasetName()));
                     if (filesIndex != null) {
                         ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds, 
firstExternalDatasetIndex);
                         firstExternalDatasetIndex = false;
@@ -925,8 +939,8 @@
                         MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, 
file);
                     }
                     // This is the first index for the external dataset, 
replicate the files index
-                    spec = 
ExternalIndexingOperations.buildFilesIndexCreateJobSpec(ds, 
externalFilesSnapshot,
-                            metadataProvider);
+                    spec = ExternalIndexingOperations
+                            .buildFilesIndexCreateJobSpec(ds, 
externalFilesSnapshot, metadataProvider);
                     if (spec == null) {
                         throw new CompilationException(
                                 "Failed to create job spec for replicating 
Files Index For external dataset");
@@ -938,16 +952,17 @@
 
             // check whether there exists another enforced index on the same 
field
             if (index.isEnforced()) {
-                List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(
-                        metadataProvider.getMetadataTxnContext(), 
index.getDataverseName(), index.getDatasetName());
+                List<Index> indexes = MetadataManager.INSTANCE
+                        
.getDatasetIndexes(metadataProvider.getMetadataTxnContext(), 
index.getDataverseName(),
+                                index.getDatasetName());
                 for (Index existingIndex : indexes) {
-                    if 
(existingIndex.getKeyFieldNames().equals(index.getKeyFieldNames())
-                            && 
!existingIndex.getKeyFieldTypes().equals(index.getKeyFieldTypes())
-                            && existingIndex.isEnforced()) {
-                        throw new CompilationException("Cannot create index " 
+ index.getIndexName()
-                                + " , enforced index " + 
existingIndex.getIndexName() + " on field \""
-                                + StringUtils.join(index.getKeyFieldNames(), 
',') + "\" is already defined with type \""
-                                + existingIndex.getKeyFieldTypes() + "\"");
+                    if 
(existingIndex.getKeyFieldNames().equals(index.getKeyFieldNames()) && 
!existingIndex
+                            
.getKeyFieldTypes().equals(index.getKeyFieldTypes()) && 
existingIndex.isEnforced()) {
+                        throw new CompilationException(
+                                "Cannot create index " + index.getIndexName() 
+ " , enforced index " + existingIndex
+                                        .getIndexName() + " on field \"" + 
StringUtils
+                                        .join(index.getKeyFieldNames(), ',') + 
"\" is already defined with type \""
+                                        + existingIndex.getKeyFieldTypes() + 
"\"");
                     }
                 }
             }
@@ -956,8 +971,9 @@
             // #. prepare to create the index artifact in NC.
             spec = IndexUtil.buildSecondaryIndexCreationJobSpec(ds, index, 
metadataProvider);
             if (spec == null) {
-                throw new CompilationException("Failed to create job spec for 
creating index '" + ds.getDatasetName()
-                        + "." + index.getIndexName() + "'");
+                throw new CompilationException(
+                        "Failed to create job spec for creating index '" + 
ds.getDatasetName() + "." + index
+                                .getIndexName() + "'");
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -1056,39 +1072,42 @@
                         abort(e, e2, mdTxnCtx);
                         throw new IllegalStateException(
                                 "System is inconsistent state: pending files 
for(" + index.getDataverseName() + "."
-                                        + index.getDatasetName() + ") couldn't 
be removed from the metadata",
-                                e);
+                                        + index.getDatasetName() + ") couldn't 
be removed from the metadata", e);
                     }
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
                         // Drop the files index from metadata
-                        
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                                index.getDataverseName(), 
index.getDatasetName(),
-                                
IndexingConstants.getFilesIndexName(index.getDatasetName()));
+                        MetadataManager.INSTANCE
+                                
.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+                                        index.getDatasetName(),
+                                        
IndexingConstants.getFilesIndexName(index.getDatasetName()));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
                         e.addSuppressed(e2);
                         abort(e, e2, mdTxnCtx);
-                        throw new IllegalStateException("System is 
inconsistent state: pending index("
-                                + index.getDataverseName() + "." + 
index.getDatasetName() + "."
-                                + 
IndexingConstants.getFilesIndexName(index.getDatasetName())
-                                + ") couldn't be removed from the metadata", 
e);
+                        throw new IllegalStateException(
+                                "System is inconsistent state: pending index(" 
+ index.getDataverseName() + "." + index
+                                        .getDatasetName() + "." + 
IndexingConstants
+                                        
.getFilesIndexName(index.getDatasetName())
+                                        + ") couldn't be removed from the 
metadata", e);
                     }
                 }
                 // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
-                            index.getDataverseName(), index.getDatasetName(), 
index.getIndexName());
+                    MetadataManager.INSTANCE
+                            
.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+                                    index.getDatasetName(), 
index.getIndexName());
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is in inconsistent 
state: pending index("
-                            + index.getDataverseName() + "." + 
index.getDatasetName() + "." + index.getIndexName()
-                            + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException(
+                            "System is in inconsistent state: pending index(" 
+ index.getDataverseName() + "." + index
+                                    .getDatasetName() + "." + 
index.getIndexName()
+                                    + ") couldn't be removed from the 
metadata", e);
                 }
             }
             throw e;
@@ -1112,8 +1131,8 @@
         String typeName = stmtCreateType.getIdent().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createTypeBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + typeName);
+        MetadataLockManager.INSTANCE
+                .createTypeBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + typeName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverseName);
             if (dv == null) {
@@ -1128,8 +1147,9 @@
                 if (BuiltinTypeMap.getBuiltinType(typeName) != null) {
                     throw new AlgebricksException("Cannot redefine builtin 
type " + typeName + ".");
                 } else {
-                    Map<TypeSignature, IAType> typeMap = 
TypeTranslator.computeTypes(mdTxnCtx,
-                            stmtCreateType.getTypeDef(), 
stmtCreateType.getIdent().getValue(), dataverseName);
+                    Map<TypeSignature, IAType> typeMap = TypeTranslator
+                            .computeTypes(mdTxnCtx, 
stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(),
+                                    dataverseName);
                     TypeSignature typeSignature = new 
TypeSignature(dataverseName, typeName);
                     IAType type = typeMap.get(typeSignature);
                     MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new 
Datatype(dataverseName, typeName, type, false));
@@ -1179,8 +1199,8 @@
             tempMdProvider.setConfig(metadataProvider.getConfig());
             for (IActiveEntityEventsListener listener : activeListeners) {
                 EntityId activeEntityId = listener.getEntityId();
-                if 
(activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
-                        && 
activeEntityId.getDataverse().equals(dataverseName)) {
+                if 
(activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME) && 
activeEntityId.getDataverse()
+                        .equals(dataverseName)) {
                     tempMdProvider.getLocks().reset();
                     stopFeedBeforeDelete(new Pair<>(dvId, new 
Identifier(activeEntityId.getEntityName())),
                             tempMdProvider);
@@ -1207,8 +1227,8 @@
                             
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if 
(ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
-                            jobsToExecute.add(
-                                    
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, 
dataset));
+                            jobsToExecute.add(ExternalIndexingOperations
+                                    
.buildDropFilesIndexJobSpec(metadataProvider, dataset));
                         } else {
                             jobsToExecute
                                     
.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider, 
dataset));
@@ -1314,8 +1334,8 @@
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = 
getActiveDataverse(stmtDelete.getDataverseName());
         String datasetName = stmtDelete.getDatasetName().getValue();
-        
MetadataLockManager.INSTANCE.dropDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
+        MetadataLockManager.INSTANCE
+                .dropDatasetBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + datasetName);
         try {
             doDropDataset(dataverseName, datasetName, metadataProvider, 
stmtDelete.getIfExists(), hcc, true);
         } finally {
@@ -1338,8 +1358,9 @@
                     
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with 
this name " + datasetName + " in dataverse "
-                            + dataverseName + ".");
+                    throw new AlgebricksException(
+                            "There is no dataset with this name " + 
datasetName + " in dataverse " + dataverseName
+                                    + ".");
                 }
             }
             ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, 
progress, hcc, dropCorrespondingNodeGroup);
@@ -1365,14 +1386,15 @@
                 mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
                 metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
                 try {
-                    
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                            datasetName);
+                    MetadataManager.INSTANCE
+                            
.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName, 
datasetName);
                     
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx.getValue());
-                    throw new IllegalStateException("System is inconsistent 
state: pending dataset(" + dataverseName
-                            + "." + datasetName + ") couldn't be removed from 
the metadata", e);
+                    throw new IllegalStateException(
+                            "System is inconsistent state: pending dataset(" + 
dataverseName + "." + datasetName
+                                    + ") couldn't be removed from the 
metadata", e);
                 }
             }
             throw e;
@@ -1392,8 +1414,8 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
-        
MetadataLockManager.INSTANCE.dropIndexBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
+        MetadataLockManager.INSTANCE
+                .dropIndexBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + datasetName);
         String indexName = null;
         // For external index
         boolean dropFilesIndex = false;
@@ -1416,8 +1438,9 @@
                 }
             }
             if (builder != null) {
-                throw new CompilationException("Dataset" + datasetName
-                        + " is currently being fed into by the following 
active entities: " + builder.toString());
+                throw new CompilationException(
+                        "Dataset" + datasetName + " is currently being fed 
into by the following active entities: "
+                                + builder.toString());
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1484,8 +1507,8 @@
                             jobsToExecute
                                     
.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, 
ds));
                             // #. mark PendingDropOp on the existing files 
index
-                            MetadataManager.INSTANCE.dropIndex(mdTxnCtx, 
dataverseName, datasetName,
-                                    externalIndex.getIndexName());
+                            MetadataManager.INSTANCE
+                                    .dropIndex(mdTxnCtx, dataverseName, 
datasetName, externalIndex.getIndexName());
                             MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                                     new Index(dataverseName, datasetName, 
externalIndex.getIndexName(),
                                             externalIndex.getIndexType(), 
externalIndex.getKeyFieldNames(),
@@ -1551,18 +1574,20 @@
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
-                    
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                            datasetName, indexName);
+                    MetadataManager.INSTANCE
+                            
.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, 
datasetName, indexName);
                     if (dropFilesIndex) {
-                        
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                                datasetName, 
IndexingConstants.getFilesIndexName(datasetName));
+                        MetadataManager.INSTANCE
+                                
.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+                                        
IndexingConstants.getFilesIndexName(datasetName));
                     }
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent 
state: pending index(" + dataverseName + "."
-                            + datasetName + "." + indexName + ") couldn't be 
removed from the metadata", e);
+                    throw new IllegalStateException(
+                            "System is inconsistent state: pending index(" + 
dataverseName + "." + datasetName + "."
+                                    + indexName + ") couldn't be removed from 
the metadata", e);
                 }
             }
 
@@ -1582,8 +1607,8 @@
 
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.dropTypeBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + typeName);
+        MetadataLockManager.INSTANCE
+                .dropTypeBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + typeName);
         try {
             Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, 
dataverseName, typeName);
             if (dt == null) {
@@ -1635,17 +1660,18 @@
 
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(),
 dataverse,
-                dataverse + "." + functionName);
+        MetadataLockManager.INSTANCE
+                .functionStatementBegin(metadataProvider.getLocks(), 
dataverse, dataverse + "." + functionName);
         try {
             Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverse);
             if (dv == null) {
                 throw new AlgebricksException("There is no dataverse with this 
name " + dataverse + ".");
             }
             // If the function body contains function calls, theirs reference 
count won't be increased.
-            Function function = new Function(dataverse, functionName, 
cfs.getaAterixFunction().getArity(),
-                    cfs.getParamList(), Function.RETURNTYPE_VOID, 
cfs.getFunctionBody(), Function.LANGUAGE_AQL,
-                    FunctionKind.SCALAR.toString(), 0);
+            Function function =
+                    new Function(dataverse, functionName, 
cfs.getaAterixFunction().getArity(), cfs.getParamList(),
+                            Function.RETURNTYPE_VOID, cfs.getFunctionBody(), 
Function.LANGUAGE_AQL,
+                            FunctionKind.SCALAR.toString(), 0);
             MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1693,8 +1719,8 @@
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.modifyDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
+        MetadataLockManager.INSTANCE
+                .modifyDatasetBegin(metadataProvider.getLocks(), 
dataverseName, dataverseName + "." + datasetName);
         try {
             CompiledLoadFromFileStatement cls =
                     new CompiledLoadFromFileStatement(dataverseName, 
loadStmt.getDatasetName().getValue(),
@@ -1861,8 +1887,8 @@
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.createFeedBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + feedName);
+        MetadataLockManager.INSTANCE
+                .createFeedBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + feedName);
         Feed feed = null;
         try {
             feed = 
MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(), 
dataverseName, feedName);
@@ -1896,8 +1922,8 @@
         CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
         dataverse = getActiveDataverse(null);
         policy = cfps.getPolicyName();
-        
MetadataLockManager.INSTANCE.createFeedPolicyBegin(metadataProvider.getLocks(), 
dataverse,
-                dataverse + "." + policy);
+        MetadataLockManager.INSTANCE
+                .createFeedPolicyBegin(metadataProvider.getLocks(), dataverse, 
dataverse + "." + policy);
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1957,8 +1983,8 @@
         String feedName = stmtFeedDrop.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.dropFeedBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + feedName);
+        MetadataLockManager.INSTANCE
+                .dropFeedBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + feedName);
         try {
             Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverseName, feedName);
             if (feed == null) {
@@ -1975,8 +2001,9 @@
             ActiveEntityEventsListener listener =
                     (ActiveEntityEventsListener) 
activeEventHandler.getActiveEntityListener(feedId);
             if (listener != null) {
-                throw new AlgebricksException("Feed " + feedId
-                        + " is currently active and connected to the following 
dataset(s) \n" + listener.toString());
+                throw new AlgebricksException(
+                        "Feed " + feedId + " is currently active and connected 
to the following dataset(s) \n"
+                                + listener.toString());
             } else {
                 JobSpecification spec = 
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
                         MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(), feedId.getEntityName()));
@@ -2003,8 +2030,8 @@
         FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement) 
stmt;
         String dataverseName = 
getActiveDataverse(stmtFeedPolicyDrop.getDataverseName());
         String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
-        
MetadataLockManager.INSTANCE.dropFeedPolicyBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + policyName);
+        MetadataLockManager.INSTANCE
+                .dropFeedPolicyBegin(metadataProvider.getLocks(), 
dataverseName, dataverseName + "." + policyName);
         try {
             FeedPolicyEntity feedPolicy = 
MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
             if (feedPolicy == null) {
@@ -2035,8 +2062,8 @@
         // Runtime handler
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
         // Feed & Feed Connections
-        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
feedName,
-                metadataProvider.getMetadataTxnContext());
+        Feed feed = FeedMetadataUtil
+                .validateIfFeedExists(dataverseName, feedName, 
metadataProvider.getMetadataTxnContext());
         List<FeedConnection> feedConnections = MetadataManager.INSTANCE
                 .getFeedConections(metadataProvider.getMetadataTxnContext(), 
dataverseName, feedName);
         ILangCompilationProvider compilationProvider = new 
AqlCompilationProvider();
@@ -2044,14 +2071,15 @@
         DefaultStatementExecutorFactory qtFactory = new 
DefaultStatementExecutorFactory();
         ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
         ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler
-                .getActiveEntityListener(entityId);
+        ActiveEntityEventsListener listener =
+                (ActiveEntityEventsListener) 
activeEventHandler.getActiveEntityListener(entityId);
         if (listener != null) {
             throw new AlgebricksException("Feed " + feedName + " is started 
already.");
         }
         // Start
-        
MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + feedName, feedConnections);
+        MetadataLockManager.INSTANCE
+                .startFeedBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + feedName,
+                        feedConnections);
         try {
             // Prepare policy
             List<IDataset> datasets = new ArrayList<>();
@@ -2133,20 +2161,21 @@
         // Check whether feed is alive
         ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
         ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        if (activeEventHandler
-                .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName)) != null) {
+        if (activeEventHandler.getActiveEntityListener(new 
EntityId(Feed.EXTENSION_NAME, dataverseName, feedName))
+                != null) {
             throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
         }
         // Transaction handling
-        
MetadataLockManager.INSTANCE.connectFeedBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName, dataverseName + "." + 
feedName);
+        MetadataLockManager.INSTANCE
+                .connectFeedBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + datasetName,
+                        dataverseName + "." + feedName);
         try {
             // validation
             FeedMetadataUtil.validateIfDatasetExists(metadataProvider, 
dataverseName, datasetName, mdTxnCtx);
-            Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
feedName,
-                    metadataProvider.getMetadataTxnContext());
-            ARecordType outputType = FeedMetadataUtil.getOutputType(feed, 
feed.getAdapterConfiguration(),
-                    ExternalDataConstants.KEY_TYPE_NAME);
+            Feed feed = FeedMetadataUtil
+                    .validateIfFeedExists(dataverseName, feedName, 
metadataProvider.getMetadataTxnContext());
+            ARecordType outputType = FeedMetadataUtil
+                    .getOutputType(feed, feed.getAdapterConfiguration(), 
ExternalDataConstants.KEY_TYPE_NAME);
             List<FunctionSignature> appliedFunctions = 
cfs.getAppliedFunctions();
             for (FunctionSignature func : appliedFunctions) {
                 if (MetadataManager.INSTANCE.getFunction(mdTxnCtx, func) == 
null) {
@@ -2154,8 +2183,8 @@
                             func.getName());
                 }
             }
-            fc = 
MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
 dataverseName,
-                    feedName, datasetName);
+            fc = MetadataManager.INSTANCE
+                    
.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName, 
feedName, datasetName);
             if (fc != null) {
                 throw new AlgebricksException("Feed" + feedName + " is already 
connected dataset " + datasetName);
             }
@@ -2188,21 +2217,23 @@
         ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
         ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
         // Check whether feed is alive
-        if (activeEventHandler
-                .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName)) != null) {
+        if (activeEventHandler.getActiveEntityListener(new 
EntityId(Feed.EXTENSION_NAME, dataverseName, feedName))
+                != null) {
             throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
         }
-        
MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName, dataverseName + "." + 
cfs.getFeedName());
+        MetadataLockManager.INSTANCE
+                .disconnectFeedBegin(metadataProvider.getLocks(), 
dataverseName, dataverseName + "." + datasetName,
+                        dataverseName + "." + cfs.getFeedName());
         try {
             FeedMetadataUtil.validateIfDatasetExists(metadataProvider, 
dataverseName, cfs.getDatasetName().getValue(),
                     mdTxnCtx);
             FeedMetadataUtil.validateIfFeedExists(dataverseName, 
cfs.getFeedName().getValue(), mdTxnCtx);
-            FeedConnection fc = 
MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
-                    dataverseName, feedName, datasetName);
+            FeedConnection fc = MetadataManager.INSTANCE
+                    
.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName, 
feedName, datasetName);
             if (fc == null) {
-                throw new CompilationException("Feed " + feedName + " is 
currently not connected to "
-                        + cfs.getDatasetName().getValue() + ". Invalid 
operation!");
+                throw new CompilationException(
+                        "Feed " + feedName + " is currently not connected to " 
+ cfs.getDatasetName().getValue()
+                                + ". Invalid operation!");
             }
             MetadataManager.INSTANCE.dropFeedConnection(mdTxnCtx, 
dataverseName, feedName, datasetName);
             for (FunctionSignature functionSignature : 
fc.getAppliedFunctions()) {
@@ -2228,8 +2259,8 @@
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         List<JobSpecification> jobsToExecute = new ArrayList<>();
-        MetadataLockManager.INSTANCE.compactBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
+        MetadataLockManager.INSTANCE
+                .compactBegin(metadataProvider.getLocks(), dataverseName, 
dataverseName + "." + datasetName);
         try {
             Dataset ds = metadataProvider.findDataset(dataverseName, 
datasetName);
             if (ds == null) {
@@ -2339,8 +2370,9 @@
         switch (resultDelivery) {
             case ASYNC:
                 MutableBoolean printed = new MutableBoolean(false);
-                executorService.submit(() -> asyncCreateAndRunJob(hcc, 
compiler, locker, resultDelivery,
-                        clientContextId, ctx, resultSetId, printed));
+                executorService
+                        .submit(() -> asyncCreateAndRunJob(hcc, compiler, 
locker, resultDelivery, clientContextId, ctx,
+                                resultSetId, printed));
                 synchronized (printed) {
                     while (!printed.booleanValue()) {
                         printed.wait();
@@ -2388,8 +2420,9 @@
                 ResultUtil.printStatus(sessionOutput, 
AbstractQueryApiServlet.ResultStatus.FAILED);
                 ResultUtil.printError(sessionOutput.out(), e);
             } else {
-                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
-                        resultDelivery.name() + " job with id " + 
jobId.getValue() + " " + "failed", e);
+                GlobalConfig.ASTERIX_LOGGER
+                        .log(Level.SEVERE, resultDelivery.name() + " job with 
id " + jobId.getValue() + " " + "failed",
+                                e);
             }
         } finally {
             synchronized (printed) {
@@ -2491,8 +2524,8 @@
         Dataset transactionDataset = null;
         boolean lockAquired = false;
         boolean success = false;
-        
MetadataLockManager.INSTANCE.refreshDatasetBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + datasetName);
+        MetadataLockManager.INSTANCE
+                .refreshDatasetBegin(metadataProvider.getLocks(), 
dataverseName, dataverseName + "." + datasetName);
         try {
             ds = metadataProvider.findDataset(dataverseName, datasetName);
             // Dataset exists ?
@@ -2527,8 +2560,8 @@
 
             // Compute delta
             // Now we compare snapshot with external file system
-            if (ExternalIndexingOperations.isDatasetUptodate(ds, 
metadataFiles, addedFiles, deletedFiles,
-                    appendedFiles)) {
+            if (ExternalIndexingOperations
+                    .isDatasetUptodate(ds, metadataFiles, addedFiles, 
deletedFiles, appendedFiles)) {
                 ((ExternalDatasetDetails) 
ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2556,8 +2589,8 @@
             }
 
             // Create the files index update job
-            spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, 
metadataFiles, addedFiles, appendedFiles,
-                    metadataProvider);
+            spec = ExternalIndexingOperations
+                    .buildFilesIndexUpdateOp(ds, metadataFiles, addedFiles, 
appendedFiles, metadataProvider);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -2568,8 +2601,8 @@
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
-                    spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, 
index, metadataFiles, addedFiles,
-                            appendedFiles, metadataProvider);
+                    spec = ExternalIndexingOperations
+                            .buildIndexUpdateOp(ds, index, metadataFiles, 
addedFiles, appendedFiles, metadataProvider);
                     // run the files update job
                     runJob(hcc, spec);
                 }
@@ -2642,8 +2675,9 @@
                 abort(e, e, mdTxnCtx);
             }
             if (transactionState == TransactionState.READY_TO_COMMIT) {
-                throw new IllegalStateException("System is inconsistent state: 
commit of (" + dataverseName + "."
-                        + datasetName + ") refresh couldn't carry out the 
commit phase", e);
+                throw new IllegalStateException(
+                        "System is inconsistent state: commit of (" + 
dataverseName + "." + datasetName
+                                + ") refresh couldn't carry out the commit 
phase", e);
             }
             if (transactionState == TransactionState.COMMIT) {
                 // Nothing to do , everything should be clean
@@ -2742,8 +2776,9 @@
             }
 
             // Constructs the pregelix command line.
-            List<String> cmd = constructPregelixCommand(pregelixStmt, 
dataverseNameFrom, datasetNameFrom,
-                    dataverseNameTo, datasetNameTo);
+            List<String> cmd =
+                    constructPregelixCommand(pregelixStmt, dataverseNameFrom, 
datasetNameFrom, dataverseNameTo,
+                            datasetNameTo);
             ProcessBuilder pb = new ProcessBuilder(cmd);
             pb.directory(new File(pregelixHome));
             pb.redirectErrorStream(true);
@@ -2778,8 +2813,9 @@
         // Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, 
datasetNameFrom);
         if (fromDataset == null) {
-            throw new CompilationException("The source dataset " + 
datasetNameFrom + " in dataverse "
-                    + dataverseNameFrom + " could not be found for the Run 
command");
+            throw new CompilationException(
+                    "The source dataset " + datasetNameFrom + " in dataverse " 
+ dataverseNameFrom
+                            + " could not be found for the Run command");
         }
         Dataset toDataset = metadataProvider.findDataset(dataverseNameTo, 
datasetNameTo);
         if (toDataset == null) {
@@ -2790,8 +2826,8 @@
         try {
             // Find the primary index of the sink dataset.
             Index toIndex = null;
-            List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
-                    pregelixStmt.getDatasetNameTo().getValue());
+            List<Index> indexes = MetadataManager.INSTANCE
+                    .getDatasetIndexes(mdTxnCtx, dataverseNameTo, 
pregelixStmt.getDatasetNameTo().getValue());
             for (Index index : indexes) {
                 if (index.isPrimaryIndex()) {
                     toIndex = index;
@@ -2805,15 +2841,18 @@
             DropDatasetStatement dropStmt =
                     new DropDatasetStatement(new Identifier(dataverseNameTo), 
pregelixStmt.getDatasetNameTo(), true);
             this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
-            IDatasetDetailsDecl idd = new 
InternalDetailsDecl(toIndex.getKeyFieldNames(),
-                    toIndex.getKeyFieldSourceIndicators(), false, null, 
toDataset.getDatasetDetails().isTemp());
-            DatasetDecl createToDataset = new DatasetDecl(new 
Identifier(dataverseNameTo),
-                    pregelixStmt.getDatasetNameTo(), new 
Identifier(toDataset.getItemTypeDataverseName()),
-                    new Identifier(toDataset.getItemTypeName()),
-                    new Identifier(toDataset.getMetaItemTypeDataverseName()),
-                    new Identifier(toDataset.getMetaItemTypeName()), new 
Identifier(toDataset.getNodeGroupName()),
-                    toDataset.getCompactionPolicy(), 
toDataset.getCompactionPolicyProperties(), toDataset.getHints(),
-                    toDataset.getDatasetType(), idd, false);
+            IDatasetDetailsDecl idd =
+                    new InternalDetailsDecl(toIndex.getKeyFieldNames(), 
toIndex.getKeyFieldSourceIndicators(), false,
+                            null, toDataset.getDatasetDetails().isTemp());
+            DatasetDecl createToDataset =
+                    new DatasetDecl(new Identifier(dataverseNameTo), 
pregelixStmt.getDatasetNameTo(),
+                            new 
Identifier(toDataset.getItemTypeDataverseName()),
+                            new Identifier(toDataset.getItemTypeName()),
+                            new 
Identifier(toDataset.getMetaItemTypeDataverseName()),
+                            new Identifier(toDataset.getMetaItemTypeName()),
+                            new Identifier(toDataset.getNodeGroupName()), 
toDataset.getCompactionPolicy(),
+                            toDataset.getCompactionPolicyProperties(), 
toDataset.getHints(), toDataset.getDatasetType(),
+                            idd, false);
             this.handleCreateDatasetStatement(metadataProvider, 
createToDataset, hcc);
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, e.getMessage(), e);
@@ -2862,8 +2901,9 @@
         ExternalProperties externalProperties = appCtx.getExternalProperties();
         String clientIP = 
ClusterProperties.INSTANCE.getCluster().getMasterNode().getClientIp();
         StringBuilder asterixdbParameterBuilder = new StringBuilder();
-        asterixdbParameterBuilder.append(
-                "pregelix.asterixdb.url=" + "http://"; + clientIP + ":" + 
externalProperties.getAPIServerPort() + ",");
+        asterixdbParameterBuilder
+                .append("pregelix.asterixdb.url=" + "http://"; + clientIP + ":" 
+ externalProperties.getAPIServerPort()
+                        + ",");
         asterixdbParameterBuilder.append("pregelix.asterixdb.source=true,");
         asterixdbParameterBuilder.append("pregelix.asterixdb.sink=true,");
         asterixdbParameterBuilder.append("pregelix.asterixdb.input.dataverse=" 
+ fromDataverseName + ",");
@@ -2911,8 +2951,8 @@
             asterixdbParameterBuilder.append(inputConverterClassKey + 
inputConverterClassValue);
             asterixdbParameterBuilder.append(outputConverterClassKey + 
outputConverterClassValue);
             // Remove the last comma.
-            
asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
-                    asterixdbParameterBuilder.length());
+            asterixdbParameterBuilder
+                    .delete(asterixdbParameterBuilder.length() - 1, 
asterixdbParameterBuilder.length());
             cmds.add(asterixdbParameterBuilder.toString());
         }
         return cmds;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index c11deef..788b6aa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -87,6 +88,14 @@
         response.setHeader(HttpHeaderNames.CONTENT_TYPE, type);
     }
 
+    public static Map<String, String> getRequestHeaders(IServletRequest 
request) {
+        Map<String, String> headers = new HashMap<>();
+        request.getHttpRequest().headers().forEach(entry -> {
+            headers.put(entry.getKey(), entry.getValue());
+        });
+        return headers;
+    }
+
     /**
      * Get the mime string representation from the extension
      *

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1867
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie918f4d3f8dae41d07536041c591c59946a077f4
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to