Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1867
Change subject: [ASTERIXDB-1973][OTR] Make IExtensionStatement#handle extensible
......................................................................
[ASTERIXDB-1973][OTR] Make IExtensionStatement#handle extensible
- user model changes: no
- storage format changes: no
- interface changes:
Introduce IRequestContexts to encapsulate request parameters
and use it in IStatementExecutor#compileAndExecute and
IExtensionStatement#handle.
Details:
- Introduce IRequestContext and its default implementation.
- Make handling IExtensionStatement in IStatementExecutor extensible.
- Include Http Headers in IRequestContext since they might be needed
by extensions to pass some information to extension statements
(e.g authentication headers).
Change-Id: Ie918f4d3f8dae41d07536041c591c59946a077f4
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
A
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
12 files changed, 460 insertions(+), 262 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/67/1867/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index b0f863b..29d5a58 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -20,12 +20,9 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
/**
@@ -42,14 +39,13 @@
* Called when the {@code IStatementExecutor} encounters an extension
statement.
* An implementation class should implement the actual processing of the
statement in this method.
*
- * @param queryTranslator
- * @param metadataProvider
+ * @param requestContext
* @param statementExecutor
- * @param hcc
- * @param resultSetIdCounter
- * @throws Exception
+ * @param metadataProvider
+ * @param resultSetId
+ * @throws HyracksDataException
+ * @throws AlgebricksException
*/
- void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
- int resultSetIdCounter) throws HyracksDataException,
AlgebricksException;
-}
+ void handle(IRequestContext requestContext, IStatementExecutor
statementExecutor, MetadataProvider metadataProvider,
+ int resultSetId) throws HyracksDataException, AlgebricksException;
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
new file mode 100644
index 0000000..0665420
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import java.util.Map;
+
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+
+public interface IRequestContext {
+
+ /**
+ * @return A Hyracks client connection that is used to submit a jobspec to
Hyracks.
+ */
+ IHyracksClientConnection getHyracksClientConnection();
+
+ /**
+ * @return A Hyracks dataset client object that is used to read the
results.
+ */
+ IHyracksDataset getHyracksDataset();
+
+ /**
+ * @return The {@code ResultDelivery} kind required for queries in the
list of statements
+ */
+ IStatementExecutor.ResultDelivery getResultDelivery();
+
+ /**
+ * @return a reference to write the stats of executed queries
+ */
+ Stats getStats();
+
+ /**
+ * @return a reference to write the metadata of executed queries
+ */
+ IStatementExecutor.ResultMetadata getOutMetadata();
+
+ /**
+ * @return the client context id for the query
+ */
+ String getClientContextId();
+
+ /**
+ * @return a reference for the statement executor for this request
+ */
+ IStatementExecutorContext getStatementExecutorContext();
+
+ /**
+ * @return The headers of the HTTP request if the request is coming from
the web. Otherwise null.
+ */
+ Map<String, String> getHttpHeaders();
+}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 19f0dcc..7ec5aa9 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -32,8 +32,6 @@
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IClusterInfoCollector;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
@@ -94,45 +92,11 @@
}
/**
- * Compiles and execute a list of statements, without passing in client
context id and context.
- *
- * @param hcc
- * A Hyracks client connection that is used to submit a jobspec
to Hyracks.
- * @param hdc
- * A Hyracks dataset client object that is used to read the
results.
- * @param resultDelivery
- * The {@code ResultDelivery} kind required for queries in the
list of statements
- * @param outMetadata
- * a reference to write the metadata of executed queries
- * @param stats
- * a reference to write the stats of executed queries
+ * Compiles and executes a list of statements
+ * @param requestContext
* @throws Exception
*/
- void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats) throws Exception;
-
- /**
- * Compiles and execute a list of statements, with passing in client
context id and context.
- *
- * @param hcc
- * A Hyracks client connection that is used to submit a jobspec
to Hyracks.
- * @param hdc
- * A Hyracks dataset client object that is used to read the
results.
- * @param resultDelivery
- * The {@code ResultDelivery} kind required for queries in the
list of statements
- * @param outMetadata
- * a reference to write the metadata of executed queries
- * @param stats
- * a reference to write the stats of executed queries
- * @param clientContextId
- * the client context id for the query
- * @param ctx
- * the context that contains the meta information for all
queries
- * @throws Exception
- */
- void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, String clientContextId,
IStatementExecutorContext ctx)
- throws Exception;
+ void compileAndExecute(IRequestContext requestContext) throws Exception;
/**
* rewrites and compiles query into a hyracks job specifications
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
new file mode 100644
index 0000000..442dd66
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/RequestContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.ctx;
+
+import java.util.Map;
+
+import org.apache.asterix.translator.IRequestContext;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+
+public class RequestContext implements IRequestContext {
+
+ private final IHyracksClientConnection hcc;
+ private final IHyracksDataset hdc;
+ private final ResultDelivery resultDelivery;
+ private final Stats stats;
+ private final Map<String, String> httpHeaders;
+ private final IStatementExecutor.ResultMetadata outMetadata;
+ private final String clientContextId;
+ private final IStatementExecutorContext ctx;
+
+ public RequestContext(Map<String, String> httpHeaders,
IHyracksClientConnection hcc, IHyracksDataset hdc,
+ ResultDelivery resultDelivery, Stats stats,
IStatementExecutor.ResultMetadata outMetadata,
+ String clientContextId, IStatementExecutorContext ctx) {
+ this.httpHeaders = httpHeaders;
+ this.hcc = hcc;
+ this.hdc = hdc;
+ this.resultDelivery = resultDelivery;
+ this.stats = stats;
+ this.outMetadata = outMetadata;
+ this.clientContextId = clientContextId;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public IHyracksClientConnection getHyracksClientConnection() {
+ return hcc;
+ }
+
+ @Override
+ public IHyracksDataset getHyracksDataset() {
+ return hdc;
+ }
+
+ @Override
+ public IStatementExecutor.ResultDelivery getResultDelivery() {
+ return resultDelivery;
+ }
+
+ @Override
+ public IStatementExecutor.Stats getStats() {
+ return stats;
+ }
+
+ @Override
+ public IStatementExecutorContext getStatementExecutorContext() {
+ return ctx;
+ }
+
+ @Override
+ public Map<String, String> getHttpHeaders() {
+ return httpHeaders;
+ }
+
+ @Override
+ public IStatementExecutor.ResultMetadata getOutMetadata() {
+ return outMetadata;
+ }
+
+ @Override
+ public String getClientContextId() {
+ return clientContextId;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7874aa3..864cad9 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -35,6 +35,7 @@
import javax.imageio.ImageIO;
+import org.apache.asterix.api.http.ctx.RequestContext;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -46,6 +47,7 @@
import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
@@ -150,8 +152,9 @@
compilationProvider, componentProvider);
double duration;
long startTime = System.currentTimeMillis();
- translator.compileAndExecute(hcc, hds,
IStatementExecutor.ResultDelivery.IMMEDIATE,
- null, new IStatementExecutor.Stats());
+ IRequestContext requestContext = new
RequestContext(HttpUtil.getRequestHeaders(request), hcc, hds,
+ IStatementExecutor.ResultDelivery.IMMEDIATE, new
IStatementExecutor.Stats(), null, null, null);
+ translator.compileAndExecute(requestContext);
long endTime = System.currentTimeMillis();
duration = (endTime - startTime) / 1000.00;
out.println(HTML_STATEMENT_SEPARATOR);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 9547514..136dd4a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -19,11 +19,11 @@
package org.apache.asterix.api.http.server;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
-import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
@@ -41,6 +41,8 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.ipc.exceptions.IPCException;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
/**
* Query service servlet that can run on NC nodes.
* Delegates query execution to CC, then serves the result.
@@ -52,7 +54,7 @@
}
@Override
- protected void executeStatement(String statementsText, SessionOutput
sessionOutput,
+ protected void executeStatement(Map<String, String> httpHeaders, String
statementsText, SessionOutput sessionOutput,
IStatementExecutor.ResultDelivery delivery,
IStatementExecutor.Stats stats, RequestParameters param,
String handleUrl, long[] outExecStartEnd) throws Exception {
// Running on NC -> send 'execute' message to CC
@@ -64,8 +66,9 @@
MessageFuture responseFuture = ncMb.registerMessageFuture();
try {
ExecuteStatementRequestMessage requestMsg =
- new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
responseFuture.getFutureId(), queryLanguage,
- statementsText, sessionOutput.config(),
ccDelivery, param.clientContextID, handleUrl);
+ new ExecuteStatementRequestMessage(httpHeaders,
ncCtx.getNodeId(), responseFuture.getFutureId(),
+ queryLanguage, statementsText,
sessionOutput.config(), ccDelivery, param.clientContextID,
+ handleUrl);
outExecStartEnd[0] = System.nanoTime();
ncMb.sendMessageToCC(requestMsg);
responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 9ee064e..c3ed2f3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -22,11 +22,13 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.ctx.RequestContext;
import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork;
@@ -40,6 +42,7 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -90,7 +93,7 @@
@Override
protected void post(IServletRequest request, IServletResponse response) {
try {
- handleRequest(getRequestParameters(request), response);
+ handleRequest(request, response);
} catch (IOException e) {
// Servlet methods should not throw exceptions
// http://cwe.mitre.org/data/definitions/600.html
@@ -387,7 +390,8 @@
return "http://" + host + path + handlePath(delivery);
}
- private void handleRequest(RequestParameters param, IServletResponse
response) throws IOException {
+ private void handleRequest(IServletRequest request, IServletResponse
response) throws IOException {
+ RequestParameters param = getRequestParameters(request);
LOGGER.info(param.toString());
long elapsedStart = System.nanoTime();
final StringWriter stringWriter = new StringWriter();
@@ -415,7 +419,8 @@
throw new AsterixException("Empty request, no statement
provided");
}
String statementsText = param.statement + ";";
- executeStatement(statementsText, sessionOutput, delivery, stats,
param, handleUrl, execStartEnd);
+ executeStatement(HttpUtil.getRequestHeaders(request),
statementsText, sessionOutput, delivery, stats, param,
+ handleUrl, execStartEnd);
if (ResultDelivery.IMMEDIATE == delivery ||
ResultDelivery.DEFERRED == delivery) {
ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
}
@@ -446,8 +451,9 @@
}
}
- protected void executeStatement(String statementsText, SessionOutput
sessionOutput, ResultDelivery delivery,
- IStatementExecutor.Stats stats, RequestParameters param, String
handleUrl, long[] outExecStartEnd)
+ protected void executeStatement(Map<String, String> httpHeaders, String
statementsText, SessionOutput sessionOutput,
+ ResultDelivery delivery, IStatementExecutor.Stats stats,
RequestParameters param, String handleUrl,
+ long[] outExecStartEnd)
throws Exception {
IClusterManagementWork.ClusterState clusterState =
ClusterStateManager.INSTANCE.getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -460,8 +466,10 @@
IStatementExecutor translator =
statementExecutorFactory.create((ICcApplicationContext) appCtx, statements,
sessionOutput, compilationProvider, componentProvider);
outExecStartEnd[0] = System.nanoTime();
- translator.compileAndExecute(getHyracksClientConnection(),
getHyracksDataset(), delivery, null, stats,
- param.clientContextID, queryCtx);
+ IRequestContext requestContext =
+ new RequestContext(httpHeaders, getHyracksClientConnection(),
getHyracksDataset(), delivery, stats,
+ null, param.clientContextID, queryCtx);
+ translator.compileAndExecute(requestContext);
outExecStartEnd[1] = System.nanoTime();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 18aae8e..61ee159 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -27,6 +27,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.api.http.ctx.RequestContext;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.common.config.GlobalConfig;
@@ -39,6 +40,7 @@
import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -173,7 +175,7 @@
response.setHeader("Access-Control-Allow-Headers", "Origin,
X-Requested-With, Content-Type, Accept");
SessionOutput sessionOutput = initResponse(request, response);
QueryTranslator.ResultDelivery resultDelivery =
whichResultDelivery(request);
- doHandle(response, query, sessionOutput, resultDelivery);
+ doHandle(request, response, query, sessionOutput, resultDelivery);
} catch (Exception e) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
LOGGER.log(Level.WARNING, "Failure handling request", e);
@@ -181,7 +183,7 @@
}
}
- private void doHandle(IServletResponse response, String query,
SessionOutput sessionOutput,
+ private void doHandle(IServletRequest request, IServletResponse response,
String query, SessionOutput sessionOutput,
ResultDelivery resultDelivery) throws JsonProcessingException {
try {
response.setStatus(HttpResponseStatus.OK);
@@ -203,7 +205,9 @@
MetadataManager.INSTANCE.init();
IStatementExecutor translator =
statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
- translator.compileAndExecute(hcc, hds, resultDelivery, null, new
IStatementExecutor.Stats());
+ IRequestContext requestContext = new
RequestContext(HttpUtil.getRequestHeaders(request), hcc, hds,
+ IStatementExecutor.ResultDelivery.IMMEDIATE, new
IStatementExecutor.Stats(), null, null, null);
+ translator.compileAndExecute(requestContext);
} catch (AsterixException | TokenMgrError |
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index a9d24b9..262fb62 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -23,7 +23,7 @@
import java.util.List;
import org.apache.asterix.api.common.APIFramework;
-import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.api.http.ctx.RequestContext;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.utils.Job;
@@ -32,6 +32,7 @@
import org.apache.asterix.lang.common.base.IParserFactory;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
@@ -109,8 +110,10 @@
IStatementExecutor translator =
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
storageComponentProvider);
- translator.compileAndExecute(hcc, null,
QueryTranslator.ResultDelivery.IMMEDIATE,
- null, new IStatementExecutor.Stats());
+ IRequestContext requestContext =
+ new RequestContext(null, hcc, null,
IStatementExecutor.ResultDelivery.IMMEDIATE,
+ new IStatementExecutor.Stats(), null, null, null);
+ translator.compileAndExecute(requestContext);
writer.flush();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index fc0c1ff..ef5024e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -22,10 +22,12 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.ctx.RequestContext;
import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.app.cc.CCExtensionManager;
import org.apache.asterix.common.api.IClusterManagementWork;
@@ -41,6 +43,7 @@
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -73,9 +76,11 @@
private final String handleUrl;
- public ExecuteStatementRequestMessage(String requestNodeId, long
requestMessageId, ILangExtension.Language lang,
- String statementsText, SessionConfig sessionConfig,
IStatementExecutor.ResultDelivery delivery,
- String clientContextID, String handleUrl) {
+ private final Map<String, String> httpHeaders;
+
+ public ExecuteStatementRequestMessage(Map<String, String> httpHeaders,
String requestNodeId, long requestMessageId,
+ ILangExtension.Language lang, String statementsText, SessionConfig
sessionConfig,
+ IStatementExecutor.ResultDelivery delivery, String
clientContextID, String handleUrl) {
this.requestNodeId = requestNodeId;
this.requestMessageId = requestMessageId;
this.lang = lang;
@@ -84,6 +89,7 @@
this.delivery = delivery;
this.clientContextID = clientContextID;
this.handleUrl = handleUrl;
+ this.httpHeaders = httpHeaders;
}
@Override
@@ -122,11 +128,11 @@
IStatementExecutor.ResultMetadata outMetadata = new
IStatementExecutor.ResultMetadata();
MetadataManager.INSTANCE.init();
- IStatementExecutor translator =
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
- compilationProvider, storageComponentProvider);
- translator.compileAndExecute(ccAppCtx.getHcc(), null,
delivery, outMetadata,
- new IStatementExecutor.Stats(), clientContextID,
statementExecutorContext);
-
+ IStatementExecutor translator = statementExecutorFactory
+ .create(ccAppCtx, statements, sessionOutput,
compilationProvider, storageComponentProvider);
+ IRequestContext requestContext = new
RequestContext(httpHeaders, ccAppCtx.getHcc(), null, delivery,
+ new IStatementExecutor.Stats(), outMetadata,
clientContextID, statementExecutorContext);
+ translator.compileAndExecute(requestContext);
outPrinter.close();
responseMsg.setResult(outWriter.toString());
responseMsg.setMetadata(outMetadata);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2967a38..98c1596 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -164,6 +164,7 @@
import
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
import
org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.asterix.translator.IRequestContext;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.SessionConfig;
@@ -254,15 +255,7 @@
}
@Override
- public void compileAndExecute(IHyracksClientConnection hcc,
IHyracksDataset hdc, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats) throws Exception {
- compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null,
null);
- }
-
- @Override
- public void compileAndExecute(IHyracksClientConnection hcc,
IHyracksDataset hdc, ResultDelivery resultDelivery,
- ResultMetadata outMetadata, Stats stats, String clientContextId,
IStatementExecutorContext ctx)
- throws Exception {
+ public void compileAndExecute(IRequestContext requestContext) throws
Exception {
int resultSetIdCounter = 0;
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -274,6 +267,13 @@
*/
String threadName = Thread.currentThread().getName();
Thread.currentThread().setName(QueryTranslator.class.getSimpleName());
+ final IHyracksClientConnection hcc =
requestContext.getHyracksClientConnection();
+ final IHyracksDataset hdc = requestContext.getHyracksDataset();
+ final ResultDelivery resultDelivery =
requestContext.getResultDelivery();
+ final Stats stats = requestContext.getStats();
+ final ResultMetadata outMetadata = requestContext.getOutMetadata();
+ final String clientContextId = requestContext.getClientContextId();
+ final IStatementExecutorContext ctx =
requestContext.getStatementExecutorContext();
try {
for (Statement stmt : statements) {
if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
@@ -394,8 +394,7 @@
// No op
break;
case Statement.Kind.EXTENSION:
- ((IExtensionStatement) stmt).handle(this,
metadataProvider, hcc, hdc, resultDelivery, stats,
- resultSetIdCounter);
+ handleExtensionMessage(stmt, requestContext,
metadataProvider, resultSetIdCounter);
break;
default:
throw new CompilationException("Unknown function");
@@ -404,6 +403,12 @@
} finally {
Thread.currentThread().setName(threadName);
}
+ }
+
+ protected void handleExtensionMessage(Statement stmt, IRequestContext
requestContext,
+ MetadataProvider metadataProvider, int resultSetIdCounter)
+ throws HyracksDataException, AlgebricksException {
+ ((IExtensionStatement) stmt).handle(requestContext, this,
metadataProvider, resultSetIdCounter);
}
protected void handleSetStatement(Statement stmt, Map<String, String>
config) {
@@ -448,7 +453,6 @@
}
protected void handleCreateDataverseStatement(MetadataProvider
metadataProvider, Statement stmt) throws Exception {
-
CreateDataverseStatement stmtCreateDataverse =
(CreateDataverseStatement) stmt;
String dvName = stmtCreateDataverse.getDataverseName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
@@ -479,8 +483,8 @@
protected static void validateCompactionPolicy(String compactionPolicy,
Map<String, String> compactionPolicyProperties,
MetadataTransactionContext mdTxnCtx,
boolean isExternalDataset) throws CompilationException, Exception {
- CompactionPolicy compactionPolicyEntity =
MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
- MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
+ CompactionPolicy compactionPolicyEntity = MetadataManager.INSTANCE
+ .getCompactionPolicy(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, compactionPolicy);
if (compactionPolicyEntity == null) {
throw new CompilationException("Unknown compaction policy: " +
compactionPolicy);
}
@@ -529,10 +533,11 @@
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.createDatasetBegin(metadataProvider.getLocks(),
dataverseName,
- itemTypeDataverseName, itemTypeDataverseName + "." +
itemTypeName, metaItemTypeDataverseName,
- metaItemTypeDataverseName + "." + metaItemTypeName,
nodegroupName, compactionPolicy,
- dataverseName + "." + datasetName, defaultCompactionPolicy);
+ MetadataLockManager.INSTANCE
+ .createDatasetBegin(metadataProvider.getLocks(),
dataverseName, itemTypeDataverseName,
+ itemTypeDataverseName + "." + itemTypeName,
metaItemTypeDataverseName,
+ metaItemTypeDataverseName + "." + metaItemTypeName,
nodegroupName, compactionPolicy,
+ dataverseName + "." + datasetName,
defaultCompactionPolicy);
Dataset dataset = null;
try {
IDatasetDetails datasetDetails = null;
@@ -545,13 +550,14 @@
throw new AlgebricksException("A dataset with this name "
+ datasetName + " already exists.");
}
}
- Datatype dt =
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
- itemTypeDataverseName, itemTypeName);
+ Datatype dt = MetadataManager.INSTANCE
+ .getDatatype(metadataProvider.getMetadataTxnContext(),
itemTypeDataverseName, itemTypeName);
if (dt == null) {
throw new AlgebricksException(": type " + itemTypeName + "
could not be found.");
}
- String ngName = ngNameId != null ? ngNameId.getValue()
- : configureNodegroupForDataset(appCtx, dd.getHints(),
dataverseName, datasetName, metadataProvider);
+ String ngName = ngNameId != null ?
+ ngNameId.getValue() :
+ configureNodegroupForDataset(appCtx, dd.getHints(),
dataverseName, datasetName, metadataProvider);
if (compactionPolicy == null) {
compactionPolicy = GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME;
@@ -581,8 +587,9 @@
((InternalDetailsDecl)
dd.getDatasetDetailsDecl()).getKeySourceIndicators();
boolean autogenerated = ((InternalDetailsDecl)
dd.getDatasetDetailsDecl()).isAutogenerated();
ARecordType aRecordType = (ARecordType) itemType;
- List<IAType> partitioningTypes =
ValidateUtil.validatePartitioningExpressions(aRecordType,
- metaRecType, partitioningExprs,
keySourceIndicators, autogenerated);
+ List<IAType> partitioningTypes = ValidateUtil
+ .validatePartitioningExpressions(aRecordType,
metaRecType, partitioningExprs,
+ keySourceIndicators, autogenerated);
List<String> filterField = ((InternalDetailsDecl)
dd.getDatasetDetailsDecl()).getFilterField();
if (filterField != null) {
@@ -674,14 +681,15 @@
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
-
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(),
dataverseName,
- datasetName);
+ MetadataManager.INSTANCE
+
.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent
state: pending dataset(" + dataverseName
- + "." + datasetName + ") couldn't be removed from
the metadata", e);
+ throw new IllegalStateException(
+ "System is inconsistent state: pending dataset(" +
dataverseName + "." + datasetName
+ + ") couldn't be removed from the
metadata", e);
}
}
throw e;
@@ -705,8 +713,9 @@
}
}
if (builder != null) {
- throw new CompilationException("Dataset " +
dataset.getDataverseName() + "." + dataset.getDatasetName()
- + " is currently being " + "fed into by the following
active entities.\n" + builder.toString());
+ throw new CompilationException(
+ "Dataset " + dataset.getDataverseName() + "." +
dataset.getDatasetName() + " is currently being "
+ + "fed into by the following active entities.\n" +
builder.toString());
}
}
@@ -744,8 +753,8 @@
List<Integer> keySourceIndicators =
stmtCreateIndex.getFieldSourceIndicators();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.createIndexBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName);
+ MetadataLockManager.INSTANCE
+ .createIndexBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + datasetName);
String indexName = null;
Dataset ds = null;
// For external datasets
@@ -758,8 +767,8 @@
}
indexName = stmtCreateIndex.getIndexName().getValue();
- index =
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
dataverseName,
- datasetName, indexName);
+ index = MetadataManager.INSTANCE
+ .getIndex(metadataProvider.getMetadataTxnContext(),
dataverseName, datasetName, indexName);
if (index != null) {
if (stmtCreateIndex.getIfNotExists()) {
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -768,13 +777,15 @@
throw new AlgebricksException("An index with this name " +
indexName + " already exists.");
}
}
- Datatype dt =
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
- ds.getItemTypeDataverseName(), ds.getItemTypeName());
+ Datatype dt = MetadataManager.INSTANCE
+ .getDatatype(metadataProvider.getMetadataTxnContext(),
ds.getItemTypeDataverseName(),
+ ds.getItemTypeName());
ARecordType aRecordType = (ARecordType) dt.getDatatype();
ARecordType metaRecordType = null;
if (ds.hasMetaPart()) {
- Datatype metaDt =
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
- ds.getMetaItemTypeDataverseName(),
ds.getMetaItemTypeName());
+ Datatype metaDt = MetadataManager.INSTANCE
+ .getDatatype(metadataProvider.getMetadataTxnContext(),
ds.getMetaItemTypeDataverseName(),
+ ds.getMetaItemTypeName());
metaRecordType = (ARecordType) metaDt.getDatatype();
}
@@ -846,17 +857,19 @@
// If it is not a fixed length
if (typeTrait.getFixedLength() < 0) {
- throw new AlgebricksException("The keyword or ngram
index -" + indexName
- + " cannot be created on the dataset -" +
datasetName
- + " due to its variable-length primary key
field - " + partitioningKey);
+ throw new AlgebricksException(
+ "The keyword or ngram index -" + indexName + "
cannot be created on the dataset -"
+ + datasetName + " due to its
variable-length primary key field - "
+ + partitioningKey);
}
}
}
- Index newIndex = new Index(dataverseName, datasetName, indexName,
stmtCreateIndex.getIndexType(),
- indexFields, keySourceIndicators, indexFieldTypes,
stmtCreateIndex.getGramLength(),
- overridesFieldTypes, stmtCreateIndex.isEnforced(), false,
MetadataUtil.PENDING_ADD_OP);
+ Index newIndex =
+ new Index(dataverseName, datasetName, indexName,
stmtCreateIndex.getIndexType(), indexFields,
+ keySourceIndicators, indexFieldTypes,
stmtCreateIndex.getGramLength(), overridesFieldTypes,
+ stmtCreateIndex.isEnforced(), false,
MetadataUtil.PENDING_ADD_OP);
doCreateIndex(hcc, metadataProvider, ds, newIndex, jobFlags);
} finally {
metadataProvider.getLocks().unlock();
@@ -892,18 +905,19 @@
}
// Check if the files index exist
- filesIndex =
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
- index.getDataverseName(), index.getDatasetName(),
-
IndexingConstants.getFilesIndexName(index.getDatasetName()));
+ filesIndex = MetadataManager.INSTANCE
+ .getIndex(metadataProvider.getMetadataTxnContext(),
index.getDataverseName(),
+ index.getDatasetName(),
IndexingConstants.getFilesIndexName(index.getDatasetName()));
firstExternalDatasetIndex = filesIndex == null;
// Lock external dataset
ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds,
firstExternalDatasetIndex);
datasetLocked = true;
if (firstExternalDatasetIndex) {
// Verify that no one has created an index before we
acquire the lock
- filesIndex =
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
- index.getDataverseName(), index.getDatasetName(),
-
IndexingConstants.getFilesIndexName(index.getDatasetName()));
+ filesIndex = MetadataManager.INSTANCE
+
.getIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+ index.getDatasetName(),
+
IndexingConstants.getFilesIndexName(index.getDatasetName()));
if (filesIndex != null) {
ExternalDatasetsRegistry.INSTANCE.buildIndexEnd(ds,
firstExternalDatasetIndex);
firstExternalDatasetIndex = false;
@@ -925,8 +939,8 @@
MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
file);
}
// This is the first index for the external dataset,
replicate the files index
- spec =
ExternalIndexingOperations.buildFilesIndexCreateJobSpec(ds,
externalFilesSnapshot,
- metadataProvider);
+ spec = ExternalIndexingOperations
+ .buildFilesIndexCreateJobSpec(ds,
externalFilesSnapshot, metadataProvider);
if (spec == null) {
throw new CompilationException(
"Failed to create job spec for replicating
Files Index For external dataset");
@@ -938,16 +952,17 @@
// check whether there exists another enforced index on the same
field
if (index.isEnforced()) {
- List<Index> indexes =
MetadataManager.INSTANCE.getDatasetIndexes(
- metadataProvider.getMetadataTxnContext(),
index.getDataverseName(), index.getDatasetName());
+ List<Index> indexes = MetadataManager.INSTANCE
+
.getDatasetIndexes(metadataProvider.getMetadataTxnContext(),
index.getDataverseName(),
+ index.getDatasetName());
for (Index existingIndex : indexes) {
- if
(existingIndex.getKeyFieldNames().equals(index.getKeyFieldNames())
- &&
!existingIndex.getKeyFieldTypes().equals(index.getKeyFieldTypes())
- && existingIndex.isEnforced()) {
- throw new CompilationException("Cannot create index "
+ index.getIndexName()
- + " , enforced index " +
existingIndex.getIndexName() + " on field \""
- + StringUtils.join(index.getKeyFieldNames(),
',') + "\" is already defined with type \""
- + existingIndex.getKeyFieldTypes() + "\"");
+ if
(existingIndex.getKeyFieldNames().equals(index.getKeyFieldNames()) &&
!existingIndex
+
.getKeyFieldTypes().equals(index.getKeyFieldTypes()) &&
existingIndex.isEnforced()) {
+ throw new CompilationException(
+ "Cannot create index " + index.getIndexName()
+ " , enforced index " + existingIndex
+ .getIndexName() + " on field \"" +
StringUtils
+ .join(index.getKeyFieldNames(), ',') +
"\" is already defined with type \""
+ + existingIndex.getKeyFieldTypes() +
"\"");
}
}
}
@@ -956,8 +971,9 @@
// #. prepare to create the index artifact in NC.
spec = IndexUtil.buildSecondaryIndexCreationJobSpec(ds, index,
metadataProvider);
if (spec == null) {
- throw new CompilationException("Failed to create job spec for
creating index '" + ds.getDatasetName()
- + "." + index.getIndexName() + "'");
+ throw new CompilationException(
+ "Failed to create job spec for creating index '" +
ds.getDatasetName() + "." + index
+ .getIndexName() + "'");
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -1056,39 +1072,42 @@
abort(e, e2, mdTxnCtx);
throw new IllegalStateException(
"System is inconsistent state: pending files
for(" + index.getDataverseName() + "."
- + index.getDatasetName() + ") couldn't
be removed from the metadata",
- e);
+ + index.getDatasetName() + ") couldn't
be removed from the metadata", e);
}
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
// Drop the files index from metadata
-
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
- index.getDataverseName(),
index.getDatasetName(),
-
IndexingConstants.getFilesIndexName(index.getDatasetName()));
+ MetadataManager.INSTANCE
+
.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+ index.getDatasetName(),
+
IndexingConstants.getFilesIndexName(index.getDatasetName()));
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is
inconsistent state: pending index("
- + index.getDataverseName() + "." +
index.getDatasetName() + "."
- +
IndexingConstants.getFilesIndexName(index.getDatasetName())
- + ") couldn't be removed from the metadata",
e);
+ throw new IllegalStateException(
+ "System is inconsistent state: pending index("
+ index.getDataverseName() + "." + index
+ .getDatasetName() + "." +
IndexingConstants
+
.getFilesIndexName(index.getDatasetName())
+ + ") couldn't be removed from the
metadata", e);
}
}
// remove the record from the metadata.
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
-
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
- index.getDataverseName(), index.getDatasetName(),
index.getIndexName());
+ MetadataManager.INSTANCE
+
.dropIndex(metadataProvider.getMetadataTxnContext(), index.getDataverseName(),
+ index.getDatasetName(),
index.getIndexName());
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is in inconsistent
state: pending index("
- + index.getDataverseName() + "." +
index.getDatasetName() + "." + index.getIndexName()
- + ") couldn't be removed from the metadata", e);
+ throw new IllegalStateException(
+ "System is in inconsistent state: pending index("
+ index.getDataverseName() + "." + index
+ .getDatasetName() + "." +
index.getIndexName()
+ + ") couldn't be removed from the
metadata", e);
}
}
throw e;
@@ -1112,8 +1131,8 @@
String typeName = stmtCreateType.getIdent().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.createTypeBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + typeName);
+ MetadataLockManager.INSTANCE
+ .createTypeBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + typeName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverseName);
if (dv == null) {
@@ -1128,8 +1147,9 @@
if (BuiltinTypeMap.getBuiltinType(typeName) != null) {
throw new AlgebricksException("Cannot redefine builtin
type " + typeName + ".");
} else {
- Map<TypeSignature, IAType> typeMap =
TypeTranslator.computeTypes(mdTxnCtx,
- stmtCreateType.getTypeDef(),
stmtCreateType.getIdent().getValue(), dataverseName);
+ Map<TypeSignature, IAType> typeMap = TypeTranslator
+ .computeTypes(mdTxnCtx,
stmtCreateType.getTypeDef(), stmtCreateType.getIdent().getValue(),
+ dataverseName);
TypeSignature typeSignature = new
TypeSignature(dataverseName, typeName);
IAType type = typeMap.get(typeSignature);
MetadataManager.INSTANCE.addDatatype(mdTxnCtx, new
Datatype(dataverseName, typeName, type, false));
@@ -1179,8 +1199,8 @@
tempMdProvider.setConfig(metadataProvider.getConfig());
for (IActiveEntityEventsListener listener : activeListeners) {
EntityId activeEntityId = listener.getEntityId();
- if
(activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
- &&
activeEntityId.getDataverse().equals(dataverseName)) {
+ if
(activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME) &&
activeEntityId.getDataverse()
+ .equals(dataverseName)) {
tempMdProvider.getLocks().reset();
stopFeedBeforeDelete(new Pair<>(dvId, new
Identifier(activeEntityId.getEntityName())),
tempMdProvider);
@@ -1207,8 +1227,8 @@
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
datasetName);
for (int k = 0; k < indexes.size(); k++) {
if
(ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
- jobsToExecute.add(
-
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
dataset));
+ jobsToExecute.add(ExternalIndexingOperations
+
.buildDropFilesIndexJobSpec(metadataProvider, dataset));
} else {
jobsToExecute
.add(IndexUtil.buildDropIndexJobSpec(indexes.get(k), metadataProvider,
dataset));
@@ -1314,8 +1334,8 @@
DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
String dataverseName =
getActiveDataverse(stmtDelete.getDataverseName());
String datasetName = stmtDelete.getDatasetName().getValue();
-
MetadataLockManager.INSTANCE.dropDatasetBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName);
+ MetadataLockManager.INSTANCE
+ .dropDatasetBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + datasetName);
try {
doDropDataset(dataverseName, datasetName, metadataProvider,
stmtDelete.getIfExists(), hcc, true);
} finally {
@@ -1338,8 +1358,9 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
return;
} else {
- throw new AlgebricksException("There is no dataset with
this name " + datasetName + " in dataverse "
- + dataverseName + ".");
+ throw new AlgebricksException(
+ "There is no dataset with this name " +
datasetName + " in dataverse " + dataverseName
+ + ".");
}
}
ds.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn,
progress, hcc, dropCorrespondingNodeGroup);
@@ -1365,14 +1386,15 @@
mdTxnCtx.setValue(MetadataManager.INSTANCE.beginTransaction());
metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
try {
-
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(),
dataverseName,
- datasetName);
+ MetadataManager.INSTANCE
+
.dropDataset(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx.getValue());
- throw new IllegalStateException("System is inconsistent
state: pending dataset(" + dataverseName
- + "." + datasetName + ") couldn't be removed from
the metadata", e);
+ throw new IllegalStateException(
+ "System is inconsistent state: pending dataset(" +
dataverseName + "." + datasetName
+ + ") couldn't be removed from the
metadata", e);
}
}
throw e;
@@ -1392,8 +1414,8 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<JobSpecification> jobsToExecute = new ArrayList<>();
-
MetadataLockManager.INSTANCE.dropIndexBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName);
+ MetadataLockManager.INSTANCE
+ .dropIndexBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + datasetName);
String indexName = null;
// For external index
boolean dropFilesIndex = false;
@@ -1416,8 +1438,9 @@
}
}
if (builder != null) {
- throw new CompilationException("Dataset" + datasetName
- + " is currently being fed into by the following
active entities: " + builder.toString());
+ throw new CompilationException(
+ "Dataset" + datasetName + " is currently being fed
into by the following active entities: "
+ + builder.toString());
}
if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1484,8 +1507,8 @@
jobsToExecute
.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
ds));
// #. mark PendingDropOp on the existing files
index
- MetadataManager.INSTANCE.dropIndex(mdTxnCtx,
dataverseName, datasetName,
- externalIndex.getIndexName());
+ MetadataManager.INSTANCE
+ .dropIndex(mdTxnCtx, dataverseName,
datasetName, externalIndex.getIndexName());
MetadataManager.INSTANCE.addIndex(mdTxnCtx,
new Index(dataverseName, datasetName,
externalIndex.getIndexName(),
externalIndex.getIndexType(),
externalIndex.getKeyFieldNames(),
@@ -1551,18 +1574,20 @@
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
try {
-
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
dataverseName,
- datasetName, indexName);
+ MetadataManager.INSTANCE
+
.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
datasetName, indexName);
if (dropFilesIndex) {
-
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(),
dataverseName,
- datasetName,
IndexingConstants.getFilesIndexName(datasetName));
+ MetadataManager.INSTANCE
+
.dropIndex(metadataProvider.getMetadataTxnContext(), dataverseName, datasetName,
+
IndexingConstants.getFilesIndexName(datasetName));
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
- throw new IllegalStateException("System is inconsistent
state: pending index(" + dataverseName + "."
- + datasetName + "." + indexName + ") couldn't be
removed from the metadata", e);
+ throw new IllegalStateException(
+ "System is inconsistent state: pending index(" +
dataverseName + "." + datasetName + "."
+ + indexName + ") couldn't be removed from
the metadata", e);
}
}
@@ -1582,8 +1607,8 @@
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.dropTypeBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + typeName);
+ MetadataLockManager.INSTANCE
+ .dropTypeBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + typeName);
try {
Datatype dt = MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
dataverseName, typeName);
if (dt == null) {
@@ -1635,17 +1660,18 @@
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.functionStatementBegin(metadataProvider.getLocks(),
dataverse,
- dataverse + "." + functionName);
+ MetadataLockManager.INSTANCE
+ .functionStatementBegin(metadataProvider.getLocks(),
dataverse, dataverse + "." + functionName);
try {
Dataverse dv = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverse);
if (dv == null) {
throw new AlgebricksException("There is no dataverse with this
name " + dataverse + ".");
}
// If the function body contains function calls, theirs reference
count won't be increased.
- Function function = new Function(dataverse, functionName,
cfs.getaAterixFunction().getArity(),
- cfs.getParamList(), Function.RETURNTYPE_VOID,
cfs.getFunctionBody(), Function.LANGUAGE_AQL,
- FunctionKind.SCALAR.toString(), 0);
+ Function function =
+ new Function(dataverse, functionName,
cfs.getaAterixFunction().getArity(), cfs.getParamList(),
+ Function.RETURNTYPE_VOID, cfs.getFunctionBody(),
Function.LANGUAGE_AQL,
+ FunctionKind.SCALAR.toString(), 0);
MetadataManager.INSTANCE.addFunction(mdTxnCtx, function);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1693,8 +1719,8 @@
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.modifyDatasetBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName);
+ MetadataLockManager.INSTANCE
+ .modifyDatasetBegin(metadataProvider.getLocks(),
dataverseName, dataverseName + "." + datasetName);
try {
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName,
loadStmt.getDatasetName().getValue(),
@@ -1861,8 +1887,8 @@
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.createFeedBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + feedName);
+ MetadataLockManager.INSTANCE
+ .createFeedBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + feedName);
Feed feed = null;
try {
feed =
MetadataManager.INSTANCE.getFeed(metadataProvider.getMetadataTxnContext(),
dataverseName, feedName);
@@ -1896,8 +1922,8 @@
CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt;
dataverse = getActiveDataverse(null);
policy = cfps.getPolicyName();
-
MetadataLockManager.INSTANCE.createFeedPolicyBegin(metadataProvider.getLocks(),
dataverse,
- dataverse + "." + policy);
+ MetadataLockManager.INSTANCE
+ .createFeedPolicyBegin(metadataProvider.getLocks(), dataverse,
dataverse + "." + policy);
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1957,8 +1983,8 @@
String feedName = stmtFeedDrop.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.dropFeedBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + feedName);
+ MetadataLockManager.INSTANCE
+ .dropFeedBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + feedName);
try {
Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
dataverseName, feedName);
if (feed == null) {
@@ -1975,8 +2001,9 @@
ActiveEntityEventsListener listener =
(ActiveEntityEventsListener)
activeEventHandler.getActiveEntityListener(feedId);
if (listener != null) {
- throw new AlgebricksException("Feed " + feedId
- + " is currently active and connected to the following
dataset(s) \n" + listener.toString());
+ throw new AlgebricksException(
+ "Feed " + feedId + " is currently active and connected
to the following dataset(s) \n"
+ + listener.toString());
} else {
JobSpecification spec =
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
MetadataManager.INSTANCE.getFeed(mdTxnCtx,
feedId.getDataverse(), feedId.getEntityName()));
@@ -2003,8 +2030,8 @@
FeedPolicyDropStatement stmtFeedPolicyDrop = (FeedPolicyDropStatement)
stmt;
String dataverseName =
getActiveDataverse(stmtFeedPolicyDrop.getDataverseName());
String policyName = stmtFeedPolicyDrop.getPolicyName().getValue();
-
MetadataLockManager.INSTANCE.dropFeedPolicyBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + policyName);
+ MetadataLockManager.INSTANCE
+ .dropFeedPolicyBegin(metadataProvider.getLocks(),
dataverseName, dataverseName + "." + policyName);
try {
FeedPolicyEntity feedPolicy =
MetadataManager.INSTANCE.getFeedPolicy(mdTxnCtx, dataverseName, policyName);
if (feedPolicy == null) {
@@ -2035,8 +2062,8 @@
// Runtime handler
EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName,
feedName);
// Feed & Feed Connections
- Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName,
feedName,
- metadataProvider.getMetadataTxnContext());
+ Feed feed = FeedMetadataUtil
+ .validateIfFeedExists(dataverseName, feedName,
metadataProvider.getMetadataTxnContext());
List<FeedConnection> feedConnections = MetadataManager.INSTANCE
.getFeedConections(metadataProvider.getMetadataTxnContext(),
dataverseName, feedName);
ILangCompilationProvider compilationProvider = new
AqlCompilationProvider();
@@ -2044,14 +2071,15 @@
DefaultStatementExecutorFactory qtFactory = new
DefaultStatementExecutorFactory();
ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
- ActiveEntityEventsListener listener = (ActiveEntityEventsListener)
activeEventHandler
- .getActiveEntityListener(entityId);
+ ActiveEntityEventsListener listener =
+ (ActiveEntityEventsListener)
activeEventHandler.getActiveEntityListener(entityId);
if (listener != null) {
throw new AlgebricksException("Feed " + feedName + " is started
already.");
}
// Start
-
MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + feedName, feedConnections);
+ MetadataLockManager.INSTANCE
+ .startFeedBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + feedName,
+ feedConnections);
try {
// Prepare policy
List<IDataset> datasets = new ArrayList<>();
@@ -2133,20 +2161,21 @@
// Check whether feed is alive
ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
- if (activeEventHandler
- .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME,
dataverseName, feedName)) != null) {
+ if (activeEventHandler.getActiveEntityListener(new
EntityId(Feed.EXTENSION_NAME, dataverseName, feedName))
+ != null) {
throw new
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED,
feedName);
}
// Transaction handling
-
MetadataLockManager.INSTANCE.connectFeedBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName, dataverseName + "." +
feedName);
+ MetadataLockManager.INSTANCE
+ .connectFeedBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + datasetName,
+ dataverseName + "." + feedName);
try {
// validation
FeedMetadataUtil.validateIfDatasetExists(metadataProvider,
dataverseName, datasetName, mdTxnCtx);
- Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName,
feedName,
- metadataProvider.getMetadataTxnContext());
- ARecordType outputType = FeedMetadataUtil.getOutputType(feed,
feed.getAdapterConfiguration(),
- ExternalDataConstants.KEY_TYPE_NAME);
+ Feed feed = FeedMetadataUtil
+ .validateIfFeedExists(dataverseName, feedName,
metadataProvider.getMetadataTxnContext());
+ ARecordType outputType = FeedMetadataUtil
+ .getOutputType(feed, feed.getAdapterConfiguration(),
ExternalDataConstants.KEY_TYPE_NAME);
List<FunctionSignature> appliedFunctions =
cfs.getAppliedFunctions();
for (FunctionSignature func : appliedFunctions) {
if (MetadataManager.INSTANCE.getFunction(mdTxnCtx, func) ==
null) {
@@ -2154,8 +2183,8 @@
func.getName());
}
}
- fc =
MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
dataverseName,
- feedName, datasetName);
+ fc = MetadataManager.INSTANCE
+
.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName,
feedName, datasetName);
if (fc != null) {
throw new AlgebricksException("Feed" + feedName + " is already
connected dataset " + datasetName);
}
@@ -2188,21 +2217,23 @@
ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
// Check whether feed is alive
- if (activeEventHandler
- .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME,
dataverseName, feedName)) != null) {
+ if (activeEventHandler.getActiveEntityListener(new
EntityId(Feed.EXTENSION_NAME, dataverseName, feedName))
+ != null) {
throw new
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED,
feedName);
}
-
MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName, dataverseName + "." +
cfs.getFeedName());
+ MetadataLockManager.INSTANCE
+ .disconnectFeedBegin(metadataProvider.getLocks(),
dataverseName, dataverseName + "." + datasetName,
+ dataverseName + "." + cfs.getFeedName());
try {
FeedMetadataUtil.validateIfDatasetExists(metadataProvider,
dataverseName, cfs.getDatasetName().getValue(),
mdTxnCtx);
FeedMetadataUtil.validateIfFeedExists(dataverseName,
cfs.getFeedName().getValue(), mdTxnCtx);
- FeedConnection fc =
MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
- dataverseName, feedName, datasetName);
+ FeedConnection fc = MetadataManager.INSTANCE
+
.getFeedConnection(metadataProvider.getMetadataTxnContext(), dataverseName,
feedName, datasetName);
if (fc == null) {
- throw new CompilationException("Feed " + feedName + " is
currently not connected to "
- + cfs.getDatasetName().getValue() + ". Invalid
operation!");
+ throw new CompilationException(
+ "Feed " + feedName + " is currently not connected to "
+ cfs.getDatasetName().getValue()
+ + ". Invalid operation!");
}
MetadataManager.INSTANCE.dropFeedConnection(mdTxnCtx,
dataverseName, feedName, datasetName);
for (FunctionSignature functionSignature :
fc.getAppliedFunctions()) {
@@ -2228,8 +2259,8 @@
boolean bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
List<JobSpecification> jobsToExecute = new ArrayList<>();
- MetadataLockManager.INSTANCE.compactBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName);
+ MetadataLockManager.INSTANCE
+ .compactBegin(metadataProvider.getLocks(), dataverseName,
dataverseName + "." + datasetName);
try {
Dataset ds = metadataProvider.findDataset(dataverseName,
datasetName);
if (ds == null) {
@@ -2339,8 +2370,9 @@
switch (resultDelivery) {
case ASYNC:
MutableBoolean printed = new MutableBoolean(false);
- executorService.submit(() -> asyncCreateAndRunJob(hcc,
compiler, locker, resultDelivery,
- clientContextId, ctx, resultSetId, printed));
+ executorService
+ .submit(() -> asyncCreateAndRunJob(hcc, compiler,
locker, resultDelivery, clientContextId, ctx,
+ resultSetId, printed));
synchronized (printed) {
while (!printed.booleanValue()) {
printed.wait();
@@ -2388,8 +2420,9 @@
ResultUtil.printStatus(sessionOutput,
AbstractQueryApiServlet.ResultStatus.FAILED);
ResultUtil.printError(sessionOutput.out(), e);
} else {
- GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
- resultDelivery.name() + " job with id " +
jobId.getValue() + " " + "failed", e);
+ GlobalConfig.ASTERIX_LOGGER
+ .log(Level.SEVERE, resultDelivery.name() + " job with
id " + jobId.getValue() + " " + "failed",
+ e);
}
} finally {
synchronized (printed) {
@@ -2491,8 +2524,8 @@
Dataset transactionDataset = null;
boolean lockAquired = false;
boolean success = false;
-
MetadataLockManager.INSTANCE.refreshDatasetBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + datasetName);
+ MetadataLockManager.INSTANCE
+ .refreshDatasetBegin(metadataProvider.getLocks(),
dataverseName, dataverseName + "." + datasetName);
try {
ds = metadataProvider.findDataset(dataverseName, datasetName);
// Dataset exists ?
@@ -2527,8 +2560,8 @@
// Compute delta
// Now we compare snapshot with external file system
- if (ExternalIndexingOperations.isDatasetUptodate(ds,
metadataFiles, addedFiles, deletedFiles,
- appendedFiles)) {
+ if (ExternalIndexingOperations
+ .isDatasetUptodate(ds, metadataFiles, addedFiles,
deletedFiles, appendedFiles)) {
((ExternalDatasetDetails)
ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2556,8 +2589,8 @@
}
// Create the files index update job
- spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds,
metadataFiles, addedFiles, appendedFiles,
- metadataProvider);
+ spec = ExternalIndexingOperations
+ .buildFilesIndexUpdateOp(ds, metadataFiles, addedFiles,
appendedFiles, metadataProvider);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
@@ -2568,8 +2601,8 @@
for (Index index : indexes) {
if (!ExternalIndexingOperations.isFileIndex(index)) {
- spec = ExternalIndexingOperations.buildIndexUpdateOp(ds,
index, metadataFiles, addedFiles,
- appendedFiles, metadataProvider);
+ spec = ExternalIndexingOperations
+ .buildIndexUpdateOp(ds, index, metadataFiles,
addedFiles, appendedFiles, metadataProvider);
// run the files update job
runJob(hcc, spec);
}
@@ -2642,8 +2675,9 @@
abort(e, e, mdTxnCtx);
}
if (transactionState == TransactionState.READY_TO_COMMIT) {
- throw new IllegalStateException("System is inconsistent state:
commit of (" + dataverseName + "."
- + datasetName + ") refresh couldn't carry out the
commit phase", e);
+ throw new IllegalStateException(
+ "System is inconsistent state: commit of (" +
dataverseName + "." + datasetName
+ + ") refresh couldn't carry out the commit
phase", e);
}
if (transactionState == TransactionState.COMMIT) {
// Nothing to do , everything should be clean
@@ -2742,8 +2776,9 @@
}
// Constructs the pregelix command line.
- List<String> cmd = constructPregelixCommand(pregelixStmt,
dataverseNameFrom, datasetNameFrom,
- dataverseNameTo, datasetNameTo);
+ List<String> cmd =
+ constructPregelixCommand(pregelixStmt, dataverseNameFrom,
datasetNameFrom, dataverseNameTo,
+ datasetNameTo);
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.directory(new File(pregelixHome));
pb.redirectErrorStream(true);
@@ -2778,8 +2813,9 @@
// Validates the source/sink dataverses and datasets.
Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom,
datasetNameFrom);
if (fromDataset == null) {
- throw new CompilationException("The source dataset " +
datasetNameFrom + " in dataverse "
- + dataverseNameFrom + " could not be found for the Run
command");
+ throw new CompilationException(
+ "The source dataset " + datasetNameFrom + " in dataverse "
+ dataverseNameFrom
+ + " could not be found for the Run command");
}
Dataset toDataset = metadataProvider.findDataset(dataverseNameTo,
datasetNameTo);
if (toDataset == null) {
@@ -2790,8 +2826,8 @@
try {
// Find the primary index of the sink dataset.
Index toIndex = null;
- List<Index> indexes =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
- pregelixStmt.getDatasetNameTo().getValue());
+ List<Index> indexes = MetadataManager.INSTANCE
+ .getDatasetIndexes(mdTxnCtx, dataverseNameTo,
pregelixStmt.getDatasetNameTo().getValue());
for (Index index : indexes) {
if (index.isPrimaryIndex()) {
toIndex = index;
@@ -2805,15 +2841,18 @@
DropDatasetStatement dropStmt =
new DropDatasetStatement(new Identifier(dataverseNameTo),
pregelixStmt.getDatasetNameTo(), true);
this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
- IDatasetDetailsDecl idd = new
InternalDetailsDecl(toIndex.getKeyFieldNames(),
- toIndex.getKeyFieldSourceIndicators(), false, null,
toDataset.getDatasetDetails().isTemp());
- DatasetDecl createToDataset = new DatasetDecl(new
Identifier(dataverseNameTo),
- pregelixStmt.getDatasetNameTo(), new
Identifier(toDataset.getItemTypeDataverseName()),
- new Identifier(toDataset.getItemTypeName()),
- new Identifier(toDataset.getMetaItemTypeDataverseName()),
- new Identifier(toDataset.getMetaItemTypeName()), new
Identifier(toDataset.getNodeGroupName()),
- toDataset.getCompactionPolicy(),
toDataset.getCompactionPolicyProperties(), toDataset.getHints(),
- toDataset.getDatasetType(), idd, false);
+ IDatasetDetailsDecl idd =
+ new InternalDetailsDecl(toIndex.getKeyFieldNames(),
toIndex.getKeyFieldSourceIndicators(), false,
+ null, toDataset.getDatasetDetails().isTemp());
+ DatasetDecl createToDataset =
+ new DatasetDecl(new Identifier(dataverseNameTo),
pregelixStmt.getDatasetNameTo(),
+ new
Identifier(toDataset.getItemTypeDataverseName()),
+ new Identifier(toDataset.getItemTypeName()),
+ new
Identifier(toDataset.getMetaItemTypeDataverseName()),
+ new Identifier(toDataset.getMetaItemTypeName()),
+ new Identifier(toDataset.getNodeGroupName()),
toDataset.getCompactionPolicy(),
+ toDataset.getCompactionPolicyProperties(),
toDataset.getHints(), toDataset.getDatasetType(),
+ idd, false);
this.handleCreateDatasetStatement(metadataProvider,
createToDataset, hcc);
} catch (Exception e) {
LOGGER.log(Level.WARNING, e.getMessage(), e);
@@ -2862,8 +2901,9 @@
ExternalProperties externalProperties = appCtx.getExternalProperties();
String clientIP =
ClusterProperties.INSTANCE.getCluster().getMasterNode().getClientIp();
StringBuilder asterixdbParameterBuilder = new StringBuilder();
- asterixdbParameterBuilder.append(
- "pregelix.asterixdb.url=" + "http://" + clientIP + ":" +
externalProperties.getAPIServerPort() + ",");
+ asterixdbParameterBuilder
+ .append("pregelix.asterixdb.url=" + "http://" + clientIP + ":"
+ externalProperties.getAPIServerPort()
+ + ",");
asterixdbParameterBuilder.append("pregelix.asterixdb.source=true,");
asterixdbParameterBuilder.append("pregelix.asterixdb.sink=true,");
asterixdbParameterBuilder.append("pregelix.asterixdb.input.dataverse="
+ fromDataverseName + ",");
@@ -2911,8 +2951,8 @@
asterixdbParameterBuilder.append(inputConverterClassKey +
inputConverterClassValue);
asterixdbParameterBuilder.append(outputConverterClassKey +
outputConverterClassValue);
// Remove the last comma.
-
asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
- asterixdbParameterBuilder.length());
+ asterixdbParameterBuilder
+ .delete(asterixdbParameterBuilder.length() - 1,
asterixdbParameterBuilder.length());
cmds.add(asterixdbParameterBuilder.toString());
}
return cmds;
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index c11deef..788b6aa 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -87,6 +88,14 @@
response.setHeader(HttpHeaderNames.CONTENT_TYPE, type);
}
+ public static Map<String, String> getRequestHeaders(IServletRequest
request) {
+ Map<String, String> headers = new HashMap<>();
+ request.getHttpRequest().headers().forEach(entry -> {
+ headers.put(entry.getKey(), entry.getValue());
+ });
+ return headers;
+ }
+
/**
* Get the mime string representation from the extension
*
--
To view, visit https://asterix-gerrit.ics.uci.edu/1867
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie918f4d3f8dae41d07536041c591c59946a077f4
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>