Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1709
Change subject: Enable HTTP API processing on NCs
......................................................................
Enable HTTP API processing on NCs
- Query/Status/Result are answered by NC nodes
- other HTTP requests are proxied to the CC node
- SessionConfig refactoring – split into config and output (SessionOutput)
- TestExecutor now can send http requests do multiple nodes (round robin)
Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
A
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M
asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
M
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
A
hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
36 files changed, 1,030 insertions(+), 337 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/09/1709/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index b0f863b..d822439 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -22,7 +22,6 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
@@ -42,14 +41,14 @@
* Called when the {@code IStatementExecutor} encounters an extension
statement.
* An implementation class should implement the actual processing of the
statement in this method.
*
- * @param queryTranslator
- * @param metadataProvider
* @param statementExecutor
+ * @param metadataProvider
* @param hcc
+ * @param outMetadata
* @param resultSetIdCounter
* @throws Exception
*/
- void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
- IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
+ void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider, IHyracksClientConnection hcc,
+ IHyracksDataset hdc, ResultDelivery resultDelivery,
IStatementExecutor.ResultMetadata outMetadata,
int resultSetIdCounter) throws HyracksDataException,
AlgebricksException;
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 92c487b..4ef2705 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -18,17 +18,24 @@
*/
package org.apache.asterix.translator;
+import java.io.Serializable;
import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
+import org.apache.commons.lang3.tuple.Triple;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IClusterInfoCollector;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
/**
@@ -52,6 +59,16 @@
* A result handle is returned before the resutlts are complete
*/
ASYNC
+ }
+
+ class ResultMetadata implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final List<Triple<JobId, ResultSetId, ARecordType>> resultSets
= new ArrayList<>();
+
+ public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
+ return resultSets;
+ }
}
public static class Stats {
@@ -85,12 +102,14 @@
* A Hyracks dataset client object that is used to read the
results.
* @param resultDelivery
* The {@code ResultDelivery} kind required for queries in the
list of statements
+ * @param outMetadata
+ * a reference to write the metadata of executed queries
* @param stats
* a reference to write the stats of executed queries
* @throws Exception
*/
void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
ResultDelivery resultDelivery,
- Stats stats) throws Exception;
+ ResultMetadata outMetadata, Stats stats) throws Exception;
/**
* Compiles and execute a list of statements, with passing in client
context id and context.
@@ -101,6 +120,8 @@
* A Hyracks dataset client object that is used to read the
results.
* @param resultDelivery
* The {@code ResultDelivery} kind required for queries in the
list of statements
+ * @param outMetadata
+ * a reference to write the metadata of executed queries
* @param stats
* a reference to write the stats of executed queries
* @param clientContextId
@@ -110,7 +131,7 @@
* @throws Exception
*/
void compileAndExecute(IHyracksClientConnection hcc, IHyracksDataset hdc,
ResultDelivery resultDelivery,
- Stats stats, String clientContextId, IStatementExecutorContext
ctx) throws Exception;
+ ResultMetadata outMetadata, Stats stats, String clientContextId,
IStatementExecutorContext ctx) throws Exception;
/**
* rewrites and compiles query into a hyracks job specifications
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
index 23365de..b244c0c 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorFactory.java
@@ -37,14 +37,14 @@
*
* @param statements
* Statements to execute
- * @param conf
- * request configuration
+ * @param output
+ * output and request configuration
* @param compilationProvider
* provides query language related components
* @param storageComponentProvider
* provides storage related components
* @return an implementation of {@code IStatementExecutor} thaxt is used
to execute the passed list of statements
*/
- IStatementExecutor create(ICcApplicationContext appCtx, List<Statement>
statements, SessionConfig conf,
+ IStatementExecutor create(ICcApplicationContext appCtx, List<Statement>
statements, SessionOutput output,
ILangCompilationProvider compilationProvider,
IStorageComponentProvider storageComponentProvider);
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
index a637e2f..cfd4e87 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java
@@ -18,12 +18,9 @@
*/
package org.apache.asterix.translator;
-import java.io.PrintWriter;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
/**
* SessionConfig captures several different parameters for controlling
@@ -38,8 +35,10 @@
* execution output - LOSSLESS_JSON, CSV, etc.
* <li>It allows you to specify output format-specific parameters.
*/
+public class SessionConfig implements Serializable {
-public class SessionConfig {
+ private static final long serialVersionUID = 1L;
+
/**
* Used to specify the output format for the primary execution.
*/
@@ -105,53 +104,25 @@
*/
public static final String FORMAT_QUOTE_RECORD = "quote-record";
- @FunctionalInterface
- public interface ResultDecorator {
- AlgebricksAppendable append(AlgebricksAppendable app) throws
AlgebricksException;
- }
-
- @FunctionalInterface
- public interface ResultAppender {
- AlgebricksAppendable append(AlgebricksAppendable app, String str)
throws AlgebricksException;
- }
+ // Output format.
+ private final OutputFormat fmt;
// Standard execution flags.
private final boolean executeQuery;
private final boolean generateJobSpec;
private final boolean optimize;
- // Output path for primary execution.
- private final PrintWriter out;
-
- // Output format.
- private final OutputFormat fmt;
-
- private final ResultDecorator preResultDecorator;
- private final ResultDecorator postResultDecorator;
- private final ResultAppender handleAppender;
- private final ResultAppender statusAppender;
-
// Flags.
private final Map<String, Boolean> flags;
- public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator
preResultDecorator,
- ResultDecorator postResultDecorator, ResultAppender
handleAppender, ResultAppender statusAppender) {
- this(out, fmt, preResultDecorator, postResultDecorator,
handleAppender, statusAppender,
- true, true, true);
- }
-
- public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize,
boolean executeQuery,
- boolean generateJobSpec) {
- this(out, fmt, null, null, null, null, optimize, executeQuery,
generateJobSpec);
+ public SessionConfig(OutputFormat fmt) {
+ this(fmt, true, true, true);
}
/**
* Create a SessionConfig object with all optional values set to defaults:
* - All format flags set to "false".
* - All out-of-band outputs set to "false".
- *
- * @param out
- * PrintWriter for execution output.
* @param fmt
* Output format for execution output.
* @param optimize
@@ -160,17 +131,9 @@
* Whether to execute the query or not.
* @param generateJobSpec
* Whether to generate the Hyracks job specification (if
- * false, job cannot be executed).
*/
- public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator
preResultDecorator,
- ResultDecorator postResultDecorator, ResultAppender
handleAppender, ResultAppender statusAppender,
- boolean optimize, boolean executeQuery, boolean generateJobSpec) {
- this.out = out;
+ public SessionConfig(OutputFormat fmt, boolean optimize, boolean
executeQuery, boolean generateJobSpec) {
this.fmt = fmt;
- this.preResultDecorator = preResultDecorator;
- this.postResultDecorator = postResultDecorator;
- this.handleAppender = handleAppender;
- this.statusAppender = statusAppender;
this.optimize = optimize;
this.executeQuery = executeQuery;
this.generateJobSpec = generateJobSpec;
@@ -178,33 +141,10 @@
}
/**
- * Retrieve the PrintWriter to produce output to.
- */
- public PrintWriter out() {
- return this.out;
- }
-
- /**
* Retrieve the OutputFormat for this execution.
*/
public OutputFormat fmt() {
return this.fmt;
- }
-
- public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws
AlgebricksException {
- return this.preResultDecorator != null ?
this.preResultDecorator.append(app) : app;
- }
-
- public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws
AlgebricksException {
- return this.postResultDecorator != null ?
this.postResultDecorator.append(app) : app;
- }
-
- public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String
handle) throws AlgebricksException {
- return this.handleAppender != null ? this.handleAppender.append(app,
handle) : app;
- }
-
- public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String
status) throws AlgebricksException {
- return this.statusAppender != null ? this.statusAppender.append(app,
status) : app;
}
/**
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
new file mode 100644
index 0000000..b559df8
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionOutput.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.translator;
+
+import java.io.PrintWriter;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+
+public class SessionOutput {
+ private final SessionConfig config;
+
+ // Output path for primary execution.
+ private final PrintWriter out;
+
+ private final SessionOutput.ResultDecorator preResultDecorator;
+ private final SessionOutput.ResultDecorator postResultDecorator;
+ private final SessionOutput.ResultAppender handleAppender;
+ private final SessionOutput.ResultAppender statusAppender;
+
+ public SessionOutput(SessionConfig config, PrintWriter out) {
+ this(config, out, null, null, null, null);
+ }
+
+ public SessionOutput(SessionConfig config, PrintWriter out,
ResultDecorator preResultDecorator,
+ ResultDecorator postResultDecorator, ResultAppender
handleAppender, ResultAppender statusAppender) {
+ this.config = config;
+ this.out = out;
+ this.preResultDecorator = preResultDecorator;
+ this.postResultDecorator = postResultDecorator;
+ this.handleAppender = handleAppender;
+ this.statusAppender = statusAppender;
+ }
+
+ /**
+ * Retrieve the PrintWriter to produce output to.
+ */
+ public PrintWriter out() {
+ return this.out;
+ }
+
+ public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws
AlgebricksException {
+ return this.preResultDecorator != null ?
this.preResultDecorator.append(app) : app;
+ }
+
+ public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws
AlgebricksException {
+ return this.postResultDecorator != null ?
this.postResultDecorator.append(app) : app;
+ }
+
+ public AlgebricksAppendable appendHandle(AlgebricksAppendable app, String
handle) throws AlgebricksException {
+ return this.handleAppender != null ? this.handleAppender.append(app,
handle) : app;
+ }
+
+ public AlgebricksAppendable appendStatus(AlgebricksAppendable app, String
status) throws AlgebricksException {
+ return this.statusAppender != null ? this.statusAppender.append(app,
status) : app;
+ }
+
+ public SessionConfig config() {
+ return config;
+ }
+
+ @FunctionalInterface
+ public interface ResultDecorator {
+ AlgebricksAppendable append(AlgebricksAppendable app) throws
AlgebricksException;
+ }
+
+ @FunctionalInterface
+ public interface ResultAppender {
+ AlgebricksAppendable append(AlgebricksAppendable app, String str)
throws AlgebricksException;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
index b12935d..5a66fd6 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java
@@ -67,6 +67,7 @@
import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.utils.ResourceUtils;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -152,31 +153,32 @@
}
}
- private void printPlanPrefix(SessionConfig conf, String planName) {
- if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("<h4>" + planName + ":</h4>");
- conf.out().println("<pre>");
+ private void printPlanPrefix(SessionOutput output, String planName) {
+ if (output.config().is(SessionConfig.FORMAT_HTML)) {
+ output.out().println("<h4>" + planName + ":</h4>");
+ output.out().println("<pre>");
} else {
- conf.out().println("----------" + planName + ":");
+ output.out().println("----------" + planName + ":");
}
}
- private void printPlanPostfix(SessionConfig conf) {
- if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("</pre>");
+ private void printPlanPostfix(SessionOutput output) {
+ if (output.config().is(SessionConfig.FORMAT_HTML)) {
+ output.out().println("</pre>");
}
}
public Pair<IReturningStatement, Integer> reWriteQuery(List<FunctionDecl>
declaredFunctions,
- MetadataProvider metadataProvider, IReturningStatement q,
SessionConfig conf) throws CompilationException {
+ MetadataProvider metadataProvider, IReturningStatement q,
SessionOutput output) throws CompilationException {
if (q == null) {
return null;
}
+ SessionConfig conf = output.config();
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) &&
conf.is(SessionConfig.OOB_EXPR_TREE)) {
- conf.out().println();
- printPlanPrefix(conf, "Expression tree");
- q.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
- printPlanPostfix(conf);
+ output.out().println();
+ printPlanPrefix(output, "Expression tree");
+ q.accept(astPrintVisitorFactory.createLangVisitor(output.out()),
0);
+ printPlanPostfix(output);
}
IQueryRewriter rw = rewriterFactory.createQueryRewriter();
rw.rewrite(declaredFunctions, q, metadataProvider, new
LangRewritingContext(q.getVarCounter()));
@@ -184,17 +186,18 @@
}
public JobSpecification compileQuery(IClusterInfoCollector
clusterInfoCollector, MetadataProvider metadataProvider,
- Query rwQ, int varCounter, String outputDatasetName, SessionConfig
conf, ICompiledDmlStatement statement)
+ Query rwQ, int varCounter, String outputDatasetName, SessionOutput
output, ICompiledDmlStatement statement)
throws AlgebricksException, RemoteException, ACIDException {
+ SessionConfig conf = output.config();
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) &&
conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) {
- conf.out().println();
+ output.out().println();
- printPlanPrefix(conf, "Rewritten expression tree");
+ printPlanPrefix(output, "Rewritten expression tree");
if (rwQ != null) {
-
rwQ.accept(astPrintVisitorFactory.createLangVisitor(conf.out()), 0);
+
rwQ.accept(astPrintVisitorFactory.createLangVisitor(output.out()), 0);
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
org.apache.asterix.common.transactions.JobId asterixJobId =
JobIdFactory.generateJobId();
@@ -211,14 +214,14 @@
}
if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) &&
conf.is(SessionConfig.OOB_LOGICAL_PLAN)) {
- conf.out().println();
+ output.out().println();
- printPlanPrefix(conf, "Logical plan");
+ printPlanPrefix(output, "Logical plan");
if (rwQ != null || (statement != null && statement.getKind() ==
Statement.Kind.LOAD)) {
- LogicalOperatorPrettyPrintVisitor pvisitor = new
LogicalOperatorPrettyPrintVisitor(conf.out());
+ LogicalOperatorPrettyPrintVisitor pvisitor = new
LogicalOperatorPrettyPrintVisitor(output.out());
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
CompilerProperties compilerProperties =
metadataProvider.getApplicationContext().getCompilerProperties();
int frameSize = compilerProperties.getFrameSize();
@@ -264,15 +267,15 @@
if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN)) {
if (conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS)) {
// For Optimizer tests.
- AlgebricksAppendable buffer = new
AlgebricksAppendable(conf.out());
+ AlgebricksAppendable buffer = new
AlgebricksAppendable(output.out());
PlanPrettyPrinter.printPhysicalOps(plan, buffer, 0);
} else {
- printPlanPrefix(conf, "Optimized logical plan");
+ printPlanPrefix(output, "Optimized logical plan");
if (rwQ != null || (statement != null &&
statement.getKind() == Statement.Kind.LOAD)) {
- LogicalOperatorPrettyPrintVisitor pvisitor = new
LogicalOperatorPrettyPrintVisitor(conf.out());
+ LogicalOperatorPrettyPrintVisitor pvisitor = new
LogicalOperatorPrettyPrintVisitor(output.out());
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
}
}
@@ -280,7 +283,7 @@
try {
LogicalOperatorPrettyPrintVisitor pvisitor = new
LogicalOperatorPrettyPrintVisitor();
PlanPrettyPrinter.printPlan(plan, pvisitor, 0);
-
ResultUtil.printResults(metadataProvider.getApplicationContext(),
pvisitor.get().toString(), conf,
+
ResultUtil.printResults(metadataProvider.getApplicationContext(),
pvisitor.get().toString(), output,
new Stats(), null);
return null;
} catch (IOException e) {
@@ -336,17 +339,17 @@
}
if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) {
- printPlanPrefix(conf, "Hyracks job");
+ printPlanPrefix(output, "Hyracks job");
if (rwQ != null) {
try {
- conf.out().println(
+ output.out().println(
new
ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(spec.toJSON()));
} catch (IOException e) {
throw new AlgebricksException(e);
}
- conf.out().println(spec.getUserConstraints());
+ output.out().println(spec.getUserConstraints());
}
- printPlanPostfix(conf);
+ printPlanPostfix(output);
}
return spec;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a9c4b39..a4e72f7 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -26,7 +26,7 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -35,7 +35,7 @@
import org.apache.hyracks.http.server.AbstractServlet;
public class AbstractQueryApiServlet extends AbstractServlet {
- protected final ICcApplicationContext appCtx;
+ protected final IApplicationContext appCtx;
public enum ResultFields {
REQUEST_ID("requestID"),
@@ -93,7 +93,7 @@
}
}
- AbstractQueryApiServlet(ICcApplicationContext appCtx,
ConcurrentMap<String, Object> ctx, String[] paths) {
+ AbstractQueryApiServlet(IApplicationContext appCtx, ConcurrentMap<String,
Object> ctx, String[] paths) {
super(ctx, paths);
this.appCtx = appCtx;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 3384332..7874aa3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -50,6 +50,7 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -137,19 +138,20 @@
}
IParser parser = parserFactory.createParser(query);
List<Statement> aqlStatements = parser.parse();
- SessionConfig sessionConfig = new SessionConfig(out, format, true,
isSet(executeQuery), true);
+ SessionConfig sessionConfig = new SessionConfig(format, true,
isSet(executeQuery), true);
sessionConfig.set(SessionConfig.FORMAT_HTML, true);
sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, csvAndHeader);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY,
isSet(wrapperArray));
sessionConfig.setOOBData(isSet(printExprParam),
isSet(printRewrittenExprParam),
isSet(printLogicalPlanParam),
isSet(printOptimizedLogicalPlanParam), isSet(printJob));
+ SessionOutput sessionOutput = new SessionOutput(sessionConfig,
out);
MetadataManager.INSTANCE.init();
- IStatementExecutor translator =
statementExectorFactory.create(appCtx, aqlStatements, sessionConfig,
+ IStatementExecutor translator =
statementExectorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
double duration;
long startTime = System.currentTimeMillis();
translator.compileAndExecute(hcc, hds,
IStatementExecutor.ResultDelivery.IMMEDIATE,
- new IStatementExecutor.Stats());
+ null, new IStatementExecutor.Stats());
long endTime = System.currentTimeMillis();
duration = (endTime - startTime) / 1000.00;
out.println(HTML_STATEMENT_SEPARATOR);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
index 401f55e..42e23ba 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java
@@ -25,9 +25,9 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.ErrorCode;
@@ -41,7 +41,7 @@
public class QueryResultApiServlet extends AbstractQueryApiServlet {
private static final Logger LOGGER =
Logger.getLogger(QueryResultApiServlet.class.getName());
- public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx) {
+ public QueryResultApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, IApplicationContext appCtx) {
super(appCtx, ctx, paths);
}
@@ -94,8 +94,8 @@
// way to send the same OutputFormat value here as was
// originally determined there. Need to save this value on
// some object that we can obtain here.
- SessionConfig sessionConfig = RestApiServlet.initResponse(request,
response);
- ResultUtil.printResults(appCtx, resultReader, sessionConfig, new
Stats(), null);
+ SessionOutput sessionOutput = RestApiServlet.initResponse(request,
response);
+ ResultUtil.printResults(appCtx, resultReader, sessionOutput, new
Stats(), null);
} catch (HyracksDataException e) {
final int errorCode = e.getErrorCode();
if (ErrorCode.NO_RESULTSET == errorCode) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 20bffc4..97890fd 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -23,22 +23,30 @@
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.api.http.ctx.StatementExecutorContext;
+import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.servlet.ServletConstants;
+import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
+import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.aql.parser.TokenMgrError;
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
@@ -46,8 +54,13 @@
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.utils.HttpUtil;
@@ -66,19 +79,24 @@
public class QueryServiceServlet extends AbstractQueryApiServlet {
private static final Logger LOGGER =
Logger.getLogger(QueryServiceServlet.class.getName());
+ private final ILangExtension.Language queryLanguage;
private final ILangCompilationProvider compilationProvider;
private final IStatementExecutorFactory statementExecutorFactory;
private final IStorageComponentProvider componentProvider;
- private final IStatementExecutorContext queryCtx = new
StatementExecutorContext();
+ private final IStatementExecutorContext queryCtx;
+ private final IServiceContext serviceCtx;
- public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx,
+ public QueryServiceServlet(ConcurrentMap<String, Object> ctx, String[]
paths, IApplicationContext appCtx,
+ ILangExtension.Language queryLanguage,
ILangCompilationProvider compilationProvider,
IStatementExecutorFactory statementExecutorFactory,
IStorageComponentProvider componentProvider) {
super(appCtx, ctx, paths);
+ this.queryLanguage = queryLanguage;
this.compilationProvider = compilationProvider;
this.statementExecutorFactory = statementExecutorFactory;
this.componentProvider = componentProvider;
- ctx.put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx);
+ this.queryCtx = (IStatementExecutorContext)
ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+ this.serviceCtx = (IServiceContext)
ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR);
}
@Override
@@ -235,40 +253,22 @@
return SessionConfig.OutputFormat.CLEAN_JSON;
}
- private static SessionConfig createSessionConfig(RequestParameters param,
String handleUrl,
+ private static SessionOutput createSessionOutput(RequestParameters param,
String handleUrl,
PrintWriter resultWriter) {
- SessionConfig.ResultDecorator resultPrefix = new
SessionConfig.ResultDecorator() {
- int resultNo = -1;
-
- @Override
- public AlgebricksAppendable append(AlgebricksAppendable app)
throws AlgebricksException {
- app.append("\t\"");
- app.append(ResultFields.RESULTS.str());
- if (resultNo >= 0) {
- app.append('-').append(String.valueOf(resultNo));
- }
- ++resultNo;
- app.append("\": ");
- return app;
- }
- };
-
- SessionConfig.ResultDecorator resultPostfix = app ->
app.append("\t,\n");
- SessionConfig.ResultAppender appendHandle = (app, handle) ->
app.append("\t\"")
- .append(ResultFields.HANDLE.str()).append("\":
\"").append(handleUrl).append(handle).append("\",\n");
- SessionConfig.ResultAppender appendStatus = (app, status) ->
app.append("\t\"")
- .append(ResultFields.STATUS.str()).append("\":
\"").append(status).append("\",\n");
+ SessionOutput.ResultDecorator resultPrefix =
ResultUtil.createPreResultDecorator();
+ SessionOutput.ResultDecorator resultPostfix =
ResultUtil.createPostResultDecorator();
+ SessionOutput.ResultAppender appendHandle =
ResultUtil.createResultHandleAppender(handleUrl);
+ SessionOutput.ResultAppender appendStatus =
ResultUtil.createResultStatusAppender();
SessionConfig.OutputFormat format = getFormat(param.format);
- SessionConfig sessionConfig =
- new SessionConfig(resultWriter, format, resultPrefix,
resultPostfix, appendHandle, appendStatus);
+ SessionConfig sessionConfig = new SessionConfig(format);
sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true);
sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty);
sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD,
format != SessionConfig.OutputFormat.CLEAN_JSON && format !=
SessionConfig.OutputFormat.LOSSLESS_JSON);
sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format ==
SessionConfig.OutputFormat.CSV
&& "present".equals(getParameterValue(param.format,
Attribute.HEADER.str())));
- return sessionConfig;
+ return new SessionOutput(sessionConfig, resultWriter, resultPrefix,
resultPostfix, appendHandle, appendStatus);
}
private static void printClientContextID(PrintWriter pw, RequestParameters
params) {
@@ -406,7 +406,8 @@
ResultDelivery delivery = parseResultDelivery(param.mode);
String handleUrl = getHandleUrl(param.host, param.path, delivery);
- SessionConfig sessionConfig = createSessionConfig(param, handleUrl,
resultWriter);
+ SessionOutput sessionOutput = createSessionOutput(param, handleUrl,
resultWriter);
+ SessionConfig sessionConfig = sessionOutput.config();
HttpUtil.setContentType(response,
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
HttpResponseStatus status = HttpResponseStatus.OK;
@@ -428,28 +429,77 @@
if (param.statement == null || param.statement.isEmpty()) {
throw new AsterixException("Empty request, no statement
provided");
}
- IParser parser =
compilationProvider.getParserFactory().createParser(param.statement + ";");
- List<Statement> statements = parser.parse();
- MetadataManager.INSTANCE.init();
- IStatementExecutor translator =
- statementExecutorFactory.create(appCtx, statements,
sessionConfig, compilationProvider,
- componentProvider);
- execStart = System.nanoTime();
- translator.compileAndExecute(getHyracksClientConnection(),
getHyracksDataset(), delivery, stats,
- param.clientContextID, queryCtx);
- execEnd = System.nanoTime();
+
+ String statementsText = param.statement + ";";
+
+ if (statementExecutorFactory != null) {
+ // Running on CC -> execute directly
+ IParser parser =
compilationProvider.getParserFactory().createParser(statementsText);
+ List<Statement> statements = parser.parse();
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator =
statementExecutorFactory.create((ICcApplicationContext) appCtx,
+ statements, sessionOutput, compilationProvider,
componentProvider);
+ execStart = System.nanoTime();
+ translator.compileAndExecute(getHyracksClientConnection(),
getHyracksDataset(), delivery, null,
+ stats, param.clientContextID, queryCtx);
+ } else {
+ // Running on NC -> send 'execute' message to CC
+ INCServiceContext ncCtx = (INCServiceContext) serviceCtx;
+ INCMessageBroker ncMb = (INCMessageBroker)
ncCtx.getMessageBroker();
+ UUID ncMessageId = UUID.randomUUID();
+ ResultDelivery ccDelivery =
+ delivery == ResultDelivery.IMMEDIATE ?
ResultDelivery.DEFERRED : delivery;
+ ExecuteStatementRequestMessage requestMsg =
+ new ExecuteStatementRequestMessage(ncCtx.getNodeId(),
ncMessageId, queryLanguage,
+ statementsText, sessionConfig, ccDelivery,
param.clientContextID, handleUrl);
+ ExecuteStatementResponseMessage responseMsg;
+ Future<IMessage> responseFuture =
ncMb.registerFuture(ncMessageId);
+ try {
+ execStart = System.nanoTime();
+ ncMb.sendMessageToCC(requestMsg);
+ responseMsg = (ExecuteStatementResponseMessage)
responseFuture.get(
+
ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS,
+ java.util.concurrent.TimeUnit.MILLISECONDS);
+ execStart = System.nanoTime();
+ } finally {
+ ncMb.deregisterFuture(ncMessageId);
+ }
+
+ Throwable err = responseMsg.getError();
+ if (err != null) {
+ if (err instanceof Exception) {
+ throw (Exception)err;
+ } else if (err instanceof TokenMgrError) {
+ throw (TokenMgrError)err;
+ } else if (err instanceof
org.apache.asterix.aqlplus.parser.TokenMgrError) {
+ throw
(org.apache.asterix.aqlplus.parser.TokenMgrError)err;
+ } else {
+ throw new Exception(err.toString());
+ }
+ }
+
+ IStatementExecutor.ResultMetadata resultMetadata =
responseMsg.getMetadata();
+ if (delivery == ResultDelivery.IMMEDIATE &&
!resultMetadata.getResultSets().isEmpty()) {
+ for (Triple<JobId, ResultSetId, ARecordType> rsmd :
resultMetadata.getResultSets()) {
+ ResultReader resultReader = new
ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+ ResultUtil.printResults(appCtx, resultReader,
sessionOutput, stats, rsmd.getRight());
+ }
+ } else {
+ sessionOutput.out().append(responseMsg.getResult());
+ }
+ }
if (ResultDelivery.IMMEDIATE == delivery ||
ResultDelivery.DEFERRED == delivery) {
- ResultUtil.printStatus(sessionConfig, ResultStatus.SUCCESS);
+ ResultUtil.printStatus(sessionOutput, ResultStatus.SUCCESS);
}
} catch (AsterixException | TokenMgrError |
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
ResultUtil.printError(resultWriter, pe);
- ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+ ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
status = HttpResponseStatus.BAD_REQUEST;
} catch (Exception e) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
ResultUtil.printError(resultWriter, e);
- ResultUtil.printStatus(sessionConfig, ResultStatus.FATAL);
+ ResultUtil.printStatus(sessionOutput, ResultStatus.FATAL);
status = HttpResponseStatus.INTERNAL_SERVER_ERROR;
} finally {
if (execStart == -1) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
index d0c574e..71dddc0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryStatusApiServlet.java
@@ -29,7 +29,7 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.hyracks.api.dataset.DatasetJobRecord;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.http.api.IServletRequest;
@@ -41,7 +41,7 @@
public class QueryStatusApiServlet extends AbstractQueryApiServlet {
private static final Logger LOGGER =
Logger.getLogger(QueryStatusApiServlet.class.getName());
- public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx) {
+ public QueryStatusApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, IApplicationContext appCtx) {
super(appCtx, ctx, paths);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index e339ba9..6b1e408 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -45,6 +45,7 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.client.dataset.HyracksDataset;
@@ -84,7 +85,7 @@
* SessionConfig with the appropriate output writer and output-format
* based on the Accept: header and other servlet parameters.
*/
- static SessionConfig initResponse(IServletRequest request,
IServletResponse response) throws IOException {
+ static SessionOutput initResponse(IServletRequest request,
IServletResponse response) throws IOException {
HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_PLAIN,
HttpUtil.Encoding.UTF8);
// CLEAN_JSON output is the default; most generally useful for a
// programmatic HTTP API
@@ -114,9 +115,9 @@
format = OutputFormat.LOSSLESS_JSON;
}
- SessionConfig.ResultAppender appendHandle = (app, handle) ->
app.append("{ \"").append("handle")
+ SessionOutput.ResultAppender appendHandle = (app, handle) ->
app.append("{ \"").append("handle")
.append("\":" + " \"").append(handle).append("\" }");
- SessionConfig sessionConfig = new SessionConfig(response.writer(),
format, null, null, appendHandle, null);
+ SessionConfig sessionConfig = new SessionConfig(format);
// If it's JSON or ADM, check for the "wrapper-array" flag. Default is
// "true" for JSON and "false" for ADM. (Not applicable for CSV.)
@@ -152,7 +153,7 @@
default:
throw new IOException("Unknown format " + format);
}
- return sessionConfig;
+ return new SessionOutput(sessionConfig, response.writer(), null, null,
appendHandle, null);
}
@Override
@@ -171,9 +172,9 @@
// enable cross-origin resource sharing
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Headers", "Origin,
X-Requested-With, Content-Type, Accept");
- SessionConfig sessionConfig = initResponse(request, response);
+ SessionOutput sessionOutput = initResponse(request, response);
QueryTranslator.ResultDelivery resultDelivery =
whichResultDelivery(request);
- doHandle(response, query, sessionConfig, resultDelivery);
+ doHandle(response, query, sessionOutput, resultDelivery);
} catch (Exception e) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
LOGGER.log(Level.WARNING, "Failure handling request", e);
@@ -181,7 +182,7 @@
}
}
- private void doHandle(IServletResponse response, String query,
SessionConfig sessionConfig,
+ private void doHandle(IServletResponse response, String query,
SessionOutput sessionOutput,
ResultDelivery resultDelivery) throws JsonProcessingException {
try {
response.setStatus(HttpResponseStatus.OK);
@@ -201,20 +202,20 @@
List<Statement> aqlStatements = parser.parse();
validate(aqlStatements);
MetadataManager.INSTANCE.init();
- IStatementExecutor translator =
statementExecutorFactory.create(appCtx, aqlStatements, sessionConfig,
+ IStatementExecutor translator =
statementExecutorFactory.create(appCtx, aqlStatements, sessionOutput,
compilationProvider, componentProvider);
- translator.compileAndExecute(hcc, hds, resultDelivery, new
IStatementExecutor.Stats());
+ translator.compileAndExecute(hcc, hds, resultDelivery, null, new
IStatementExecutor.Stats());
} catch (AsterixException | TokenMgrError |
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(), pe);
String errorMessage = ResultUtil.buildParseExceptionMessage(pe,
query);
ObjectNode errorResp =
ResultUtil.getErrorResponse(2, errorMessage, "",
ResultUtil.extractFullStackTrace(pe));
- sessionConfig.out().write(new
ObjectMapper().writeValueAsString(errorResp));
+ sessionOutput.out().write(new
ObjectMapper().writeValueAsString(errorResp));
} catch (Exception e) {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e);
response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
- ResultUtil.apiErrorHandler(sessionConfig.out(), e);
+ ResultUtil.apiErrorHandler(sessionOutput.out(), e);
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
index fe6fa89..3bcd670 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ResultUtil.java
@@ -36,10 +36,10 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultPrinter;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.http.ParseException;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -77,29 +77,29 @@
return escaped;
}
- public static void printResults(ICcApplicationContext appCtx, ResultReader
resultReader, SessionConfig conf,
+ public static void printResults(IApplicationContext appCtx, ResultReader
resultReader, SessionOutput output,
Stats stats, ARecordType recordType) throws HyracksDataException {
- new ResultPrinter(appCtx, conf, stats, recordType).print(resultReader);
+ new ResultPrinter(appCtx, output, stats,
recordType).print(resultReader);
}
- public static void printResults(ICcApplicationContext appCtx, String
record, SessionConfig conf, Stats stats,
+ public static void printResults(IApplicationContext appCtx, String record,
SessionOutput output, Stats stats,
ARecordType recordType) throws HyracksDataException {
- new ResultPrinter(appCtx, conf, stats, recordType).print(record);
+ new ResultPrinter(appCtx, output, stats, recordType).print(record);
}
- public static void printResultHandle(SessionConfig conf, ResultHandle
handle) throws HyracksDataException {
+ public static void printResultHandle(SessionOutput output, ResultHandle
handle) throws HyracksDataException {
try {
- final AlgebricksAppendable app = new
AlgebricksAppendable(conf.out());
- conf.appendHandle(app, handle.toString());
+ final AlgebricksAppendable app = new
AlgebricksAppendable(output.out());
+ output.appendHandle(app, handle.toString());
} catch (AlgebricksException e) {
LOGGER.warn("error printing handle", e);
}
}
- public static void printStatus(SessionConfig conf,
AbstractQueryApiServlet.ResultStatus rs) {
+ public static void printStatus(SessionOutput output,
AbstractQueryApiServlet.ResultStatus rs) {
try {
- final AlgebricksAppendable app = new
AlgebricksAppendable(conf.out());
- conf.appendStatus(app, rs.str());
+ final AlgebricksAppendable app = new
AlgebricksAppendable(output.out());
+ output.appendStatus(app, rs.str());
} catch (AlgebricksException e) {
LOGGER.warn("error printing status", e);
}
@@ -318,4 +318,35 @@
return errorTemplate;
}
+ public static SessionOutput.ResultDecorator createPreResultDecorator() {
+ return new SessionOutput.ResultDecorator() {
+ int resultNo = -1;
+
+ @Override
+ public AlgebricksAppendable append(AlgebricksAppendable app)
throws AlgebricksException {
+ app.append("\t\"");
+ app.append(AbstractQueryApiServlet.ResultFields.RESULTS.str());
+ if (resultNo >= 0) {
+ app.append('-').append(String.valueOf(resultNo));
+ }
+ ++resultNo;
+ app.append("\": ");
+ return app;
+ }
+ };
+ }
+
+ public static SessionOutput.ResultDecorator createPostResultDecorator() {
+ return app -> app.append("\t,\n");
+ }
+
+ public static SessionOutput.ResultAppender
createResultHandleAppender(String handleUrl) {
+ return (app, handle) ->
app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.HANDLE.str())
+ .append("\":
\"").append(handleUrl).append(handle).append("\",\n");
+ }
+
+ public static SessionOutput.ResultAppender createResultStatusAppender() {
+ return (app, status) ->
app.append("\t\"").append(AbstractQueryApiServlet.ResultFields.STATUS.str())
+ .append("\": \"").append(status).append("\",\n");
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
index a9d4e22..9e1609d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
@@ -24,6 +24,7 @@
public static final String ASTERIX_APP_CONTEXT_INFO_ATTR =
"org.apache.asterix.APP_CONTEXT_INFO";
public static final String EXECUTOR_SERVICE_ATTR =
"org.apache.asterix.EXECUTOR_SERVICE_ATTR";
public static final String RUNNING_QUERIES_ATTR =
"org.apache.asterix.RUNINNG_QUERIES";
+ public static final String SERVICE_CONTEXT_ATTR =
"org.apache.asterix.SERVICE_CONTEXT";
private ServletConstants() {
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
index ecf2c53..a9d24b9 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java
@@ -36,6 +36,7 @@
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionConfig.OutputFormat;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobSpecification;
@@ -99,16 +100,17 @@
List<Statement> statements = parser.parse();
MetadataManager.INSTANCE.init();
- SessionConfig conf = new SessionConfig(writer, OutputFormat.ADM,
optimize, true, generateBinaryRuntime);
+ SessionConfig conf = new SessionConfig(OutputFormat.ADM, optimize,
true, generateBinaryRuntime);
conf.setOOBData(false, printRewrittenExpressions, printLogicalPlan,
printOptimizedPlan, printJob);
if (printPhysicalOpsOnly) {
conf.set(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS, true);
}
+ SessionOutput output = new SessionOutput(conf, writer);
- IStatementExecutor translator =
statementExecutorFactory.create(appCtx, statements, conf, compilationProvider,
+ IStatementExecutor translator =
statementExecutorFactory.create(appCtx, statements, output, compilationProvider,
storageComponentProvider);
translator.compileAndExecute(hcc, null,
QueryTranslator.ResultDelivery.IMMEDIATE,
- new IStatementExecutor.Stats());
+ null, new IStatementExecutor.Stats());
writer.flush();
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index 6c6f2af..0c6b2cc 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -110,11 +110,11 @@
return statementExecutorFactory;
}
- public ILangCompilationProvider getAqlCompilationProvider() {
- return aqlCompilationProvider;
- }
-
- public ILangCompilationProvider getSqlppCompilationProvider() {
- return sqlppCompilationProvider;
+ public ILangCompilationProvider getCompilationProvider(Language lang) {
+ switch (lang) {
+ case AQL: return aqlCompilationProvider;
+ case SQLPP: return sqlppCompilationProvider;
+ default: throw new IllegalArgumentException(String.valueOf(lang));
+ }
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
new file mode 100644
index 0000000..59d516f
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.lang.aql.parser.TokenMgrError;
+import org.apache.asterix.lang.common.base.IParser;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+
+public final class ExecuteStatementRequestMessage implements
ICcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOGGER =
Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
+
+ private final String requestNodeId;
+
+ private final UUID requestMessageId;
+
+ private final ILangExtension.Language lang;
+
+ private final String statementsText;
+
+ private final SessionConfig sessionConfig;
+
+ private final IStatementExecutor.ResultDelivery delivery;
+
+ private final String clientContextID;
+
+ private final String handleUrl;
+
+ public ExecuteStatementRequestMessage(String requestNodeId, UUID
requestMessageId, ILangExtension.Language lang,
+ String statementsText, SessionConfig sessionConfig,
IStatementExecutor.ResultDelivery delivery,
+ String clientContextID, String handleUrl) {
+ this.requestNodeId = requestNodeId;
+ this.requestMessageId = requestMessageId;
+ this.lang = lang;
+ this.statementsText = statementsText;
+ this.sessionConfig = sessionConfig;
+ this.delivery = delivery;
+ this.clientContextID = clientContextID;
+ this.handleUrl = handleUrl;
+ }
+
+ @Override
+ public void handle(ICcApplicationContext ccAppCtx) throws
HyracksDataException, InterruptedException {
+ ICCServiceContext ccSrvContext = ccAppCtx.getServiceContext();
+ ClusterControllerService ccSrv = (ClusterControllerService)
ccSrvContext.getControllerService();
+ CCApplication ccApp = (CCApplication) ccSrv.getApplication();
+ CCMessageBroker messageBroker = (CCMessageBroker)
ccSrvContext.getMessageBroker();
+ CCExtensionManager ccExtMgr = (CCExtensionManager)
ccAppCtx.getExtensionManager();
+ ILangCompilationProvider compilationProvider =
ccExtMgr.getCompilationProvider(lang);
+ IStorageComponentProvider storageComponentProvider =
ccAppCtx.getStorageComponentProvider();
+ IStatementExecutorFactory statementExecutorFactory =
ccApp.getStatementExecutorFactory();
+ IStatementExecutorContext statementExecutorContext =
ccApp.getStatementExecutorContext();
+
+ ccSrv.getExecutor().submit(() -> {
+ ExecuteStatementResponseMessage responseMsg = new
ExecuteStatementResponseMessage(requestMessageId);
+
+ try {
+ IParser parser =
compilationProvider.getParserFactory().createParser(statementsText);
+ List<Statement> statements = parser.parse();
+
+ StringWriter outWriter = new StringWriter(256);
+ PrintWriter outPrinter = new PrintWriter(outWriter);
+ SessionOutput.ResultDecorator resultPrefix =
ResultUtil.createPreResultDecorator();
+ SessionOutput.ResultDecorator resultPostfix =
ResultUtil.createPostResultDecorator();
+ SessionOutput.ResultAppender appendHandle =
ResultUtil.createResultHandleAppender(handleUrl);
+ SessionOutput.ResultAppender appendStatus =
ResultUtil.createResultStatusAppender();
+ SessionOutput sessionOutput = new SessionOutput(sessionConfig,
outPrinter, resultPrefix, resultPostfix,
+ appendHandle, appendStatus);
+
+ IStatementExecutor.ResultMetadata outMetadata = new
IStatementExecutor.ResultMetadata();
+
+ MetadataManager.INSTANCE.init();
+ IStatementExecutor translator =
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+ compilationProvider, storageComponentProvider);
+ translator.compileAndExecute(ccAppCtx.getHcc(), null,
delivery, outMetadata,
+ new IStatementExecutor.Stats(), clientContextID,
statementExecutorContext);
+
+ outPrinter.close();
+ responseMsg.setResult(outWriter.toString());
+ responseMsg.setMetadata(outMetadata);
+ } catch (TokenMgrError |
org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(),
pe);
+ responseMsg.setError(pe);
+ } catch (AsterixException pe) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, pe.getMessage(),
pe);
+ responseMsg.setError(new AsterixException(pe.getMessage()));
+ } catch (Exception e) {
+ GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(),
e);
+ responseMsg.setError(new Exception(e.toString()));
+ }
+
+ try {
+ messageBroker.sendApplicationMessageToNC(responseMsg,
requestNodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, e.getMessage(), e);
+ }
+ });
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s, from=%s): %s",
getClass().getSimpleName(), requestMessageId, requestNodeId,
+ statementsText);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
new file mode 100644
index 0000000..9bd058a
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.app.message;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.messages.IMessage;
+
+public final class ExecuteStatementResponseMessage implements
INcAddressedMessage {
+ private static final long serialVersionUID = 1L;
+
+ public static final long DEFAULT_TIMEOUT_MILLIS =
TimeUnit.MINUTES.toMillis(5);
+
+ private final UUID requestMessageId;
+
+ private String result;
+
+ private IStatementExecutor.ResultMetadata metadata;
+
+ private Throwable error;
+
+ public ExecuteStatementResponseMessage(UUID requestMessageId) {
+ this.requestMessageId = requestMessageId;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
+ NCMessageBroker mb = (NCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ CompletableFuture<IMessage> result =
mb.deregisterFuture(requestMessageId);
+ if (result != null) {
+ result.complete(this);
+ }
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public void setError(Throwable error) {
+ this.error = error;
+ }
+
+ public String getResult() {
+ return result;
+ }
+
+ public void setResult(String result) {
+ this.result = result;
+ }
+
+ public IStatementExecutor.ResultMetadata getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(IStatementExecutor.ResultMetadata metadata) {
+ this.metadata = metadata;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s): %d characters",
getClass().getSimpleName(), requestMessageId,
+ result != null ? result.length() : 0);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
index 7ed3aef..452d13e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java
@@ -24,10 +24,11 @@
import java.io.StringWriter;
import java.nio.ByteBuffer;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
import org.apache.hyracks.api.comm.IFrame;
@@ -47,6 +48,7 @@
private final FrameManager resultDisplayFrameMgr;
+ private final SessionOutput output;
private final SessionConfig conf;
private final Stats stats;
private final ARecordType recordType;
@@ -62,8 +64,9 @@
private ObjectMapper om;
private ObjectWriter ow;
- public ResultPrinter(ICcApplicationContext appCtx, SessionConfig conf,
Stats stats, ARecordType recordType) {
- this.conf = conf;
+ public ResultPrinter(IApplicationContext appCtx, SessionOutput output,
Stats stats, ARecordType recordType) {
+ this.output = output;
+ this.conf = output.config();
this.stats = stats;
this.recordType = recordType;
this.indentJSON = conf.is(SessionConfig.FORMAT_INDENT_JSON);
@@ -112,18 +115,18 @@
// If we're outputting CSV with a header, the HTML header was already
// output by displayCSVHeader(), so skip it here
if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("<h4>Results:</h4>");
- conf.out().println("<pre class=\"result-content\">");
+ output.out().println("<h4>Results:</h4>");
+ output.out().println("<pre class=\"result-content\">");
}
try {
- conf.resultPrefix(new AlgebricksAppendable(conf.out()));
+ output.resultPrefix(new AlgebricksAppendable(output.out()));
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
if (conf.is(SessionConfig.FORMAT_WRAPPER_ARRAY)) {
- conf.out().print("[ ");
+ output.out().print("[ ");
wrapArray = true;
}
@@ -134,29 +137,29 @@
if (quoteRecord) {
StringWriter sw = new StringWriter();
appendCSVHeader(sw, recordType);
- conf.out().print(JSONUtil.quoteAndEscape(sw.toString()));
- conf.out().print("\n");
+ output.out().print(JSONUtil.quoteAndEscape(sw.toString()));
+ output.out().print("\n");
notFirst = true;
} else {
- appendCSVHeader(conf.out(), recordType);
+ appendCSVHeader(output.out(), recordType);
}
}
}
private void printPostfix() throws HyracksDataException {
- conf.out().flush();
+ output.out().flush();
if (wrapArray) {
- conf.out().println(" ]");
+ output.out().println(" ]");
}
try {
- conf.resultPostfix(new AlgebricksAppendable(conf.out()));
+ output.resultPostfix(new AlgebricksAppendable(output.out()));
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
if (conf.is(SessionConfig.FORMAT_HTML)) {
- conf.out().println("</pre>");
+ output.out().println("</pre>");
}
- conf.out().flush();
+ output.out().flush();
}
private void displayRecord(String result) throws HyracksDataException {
@@ -177,7 +180,7 @@
// TODO(tillw): this is inefficient as well
record = JSONUtil.quoteAndEscape(record);
}
- conf.out().print(record);
+ output.out().print(record);
stats.setCount(stats.getCount() + 1);
// TODO(tillw) fix this approximation
stats.setSize(stats.getSize() + record.length());
@@ -211,7 +214,7 @@
}
String result = new String(frameBytes, start, length, UTF_8);
if (wrapArray && notFirst) {
- conf.out().print(", ");
+ output.out().print(", ");
}
notFirst = true;
displayRecord(result);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
index e2f17ac..00fb1b0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java
@@ -28,7 +28,7 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorFactory;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
public class DefaultStatementExecutorFactory implements
IStatementExecutorFactory {
@@ -48,9 +48,9 @@
}
@Override
- public IStatementExecutor create(ICcApplicationContext appCtx,
List<Statement> statements, SessionConfig conf,
+ public IStatementExecutor create(ICcApplicationContext appCtx,
List<Statement> statements, SessionOutput output,
ILangCompilationProvider compilationProvider,
IStorageComponentProvider storageComponentProvider) {
- return new QueryTranslator(appCtx, statements, conf,
compilationProvider, storageComponentProvider,
+ return new QueryTranslator(appCtx, statements, output,
compilationProvider, storageComponentProvider,
executorService);
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e97a7e1..46ece3e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -161,6 +161,7 @@
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.asterix.translator.TypeTranslator;
import org.apache.asterix.translator.util.ValidateUtil;
import org.apache.asterix.utils.DataverseUtil;
@@ -170,6 +171,7 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.commons.lang3.tuple.Triple;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -203,6 +205,7 @@
public static final boolean IS_DEBUG_MODE = false;// true
protected final List<Statement> statements;
protected final ICcApplicationContext appCtx;
+ protected final SessionOutput sessionOutput;
protected final SessionConfig sessionConfig;
protected Dataverse activeDataverse;
protected final List<FunctionDecl> declaredFunctions;
@@ -211,12 +214,13 @@
protected final IStorageComponentProvider componentProvider;
protected final ExecutorService executorService;
- public QueryTranslator(ICcApplicationContext appCtx, List<Statement>
statements, SessionConfig conf,
+ public QueryTranslator(ICcApplicationContext appCtx, List<Statement>
statements, SessionOutput output,
ILangCompilationProvider compliationProvider,
IStorageComponentProvider componentProvider,
ExecutorService executorService) {
this.appCtx = appCtx;
this.statements = statements;
- this.sessionConfig = conf;
+ this.sessionOutput = output;
+ this.sessionConfig = output.config();
this.componentProvider = componentProvider;
declaredFunctions = getDeclaredFunctions(statements);
apiFramework = new APIFramework(compliationProvider);
@@ -241,13 +245,13 @@
@Override
public void compileAndExecute(IHyracksClientConnection hcc,
IHyracksDataset hdc, ResultDelivery resultDelivery,
- Stats stats) throws Exception {
- compileAndExecute(hcc, hdc, resultDelivery, stats, null, null);
+ ResultMetadata outMetadata, Stats stats) throws Exception {
+ compileAndExecute(hcc, hdc, resultDelivery, outMetadata, stats, null,
null);
}
@Override
public void compileAndExecute(IHyracksClientConnection hcc,
IHyracksDataset hdc, ResultDelivery resultDelivery,
- Stats stats, String clientContextId, IStatementExecutorContext
ctx) throws Exception {
+ ResultMetadata outMetadata, Stats stats, String clientContextId,
IStatementExecutorContext ctx) throws Exception {
int resultSetIdCounter = 0;
FileSplit outputFile = null;
IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE;
@@ -262,7 +266,7 @@
try {
for (Statement stmt : statements) {
if (sessionConfig.is(SessionConfig.FORMAT_HTML)) {
-
sessionConfig.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
+
sessionOutput.out().println(ApiServlet.HTML_STATEMENT_SEPARATOR);
}
validateOperation(appCtx, activeDataverse, stmt);
rewriteStatement(stmt); // Rewrite the statement's AST.
@@ -324,8 +328,8 @@
metadataProvider.setResultAsyncMode(resultDelivery
== ResultDelivery.ASYNC
|| resultDelivery ==
ResultDelivery.DEFERRED);
}
- handleInsertUpsertStatement(metadataProvider, stmt,
hcc, hdc, resultDelivery, stats, false,
- clientContextId, ctx);
+ handleInsertUpsertStatement(metadataProvider, stmt,
hcc, hdc, resultDelivery, outMetadata,
+ stats,false, clientContextId, ctx);
break;
case Statement.Kind.DELETE:
handleDeleteStatement(metadataProvider, stmt, hcc,
false);
@@ -358,8 +362,8 @@
metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter++));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC ||
resultDelivery == ResultDelivery.DEFERRED);
- handleQuery(metadataProvider, (Query) stmt, hcc, hdc,
resultDelivery, stats, clientContextId,
- ctx);
+ handleQuery(metadataProvider, (Query) stmt, hcc, hdc,
resultDelivery, outMetadata, stats,
+ clientContextId, ctx);
break;
case Statement.Kind.COMPACT:
handleCompactStatement(metadataProvider, stmt, hcc);
@@ -379,8 +383,8 @@
// No op
break;
case Statement.Kind.EXTENSION:
- ((IExtensionStatement) stmt).handle(this,
metadataProvider, hcc, hdc, resultDelivery, stats,
- resultSetIdCounter);
+ ((IExtensionStatement) stmt).handle(this,
metadataProvider, hcc, hdc, resultDelivery,
+ outMetadata, resultSetIdCounter);
break;
default:
throw new CompilationException("Unknown function");
@@ -1694,7 +1698,7 @@
CompiledLoadFromFileStatement cls =
new CompiledLoadFromFileStatement(dataverseName,
loadStmt.getDatasetName().getValue(),
loadStmt.getAdapter(), loadStmt.getProperties(),
loadStmt.dataIsAlreadySorted());
- JobSpecification spec = apiFramework.compileQuery(hcc,
metadataProvider, null, 0, null, sessionConfig, cls);
+ JobSpecification spec = apiFramework.compileQuery(hcc,
metadataProvider, null, 0, null, sessionOutput, cls);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
if (spec != null) {
@@ -1712,8 +1716,8 @@
public JobSpecification handleInsertUpsertStatement(MetadataProvider
metadataProvider, Statement stmt,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery,
- IStatementExecutor.Stats stats, boolean compileOnly, String
clientContextId, IStatementExecutorContext ctx)
- throws Exception {
+ ResultMetadata outMetadata, Stats stats, boolean compileOnly,
String clientContextId,
+ IStatementExecutorContext ctx) throws Exception {
InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
String dataverseName =
getActiveDataverse(stmtInsertUpsert.getDataverseName());
final IMetadataLocker locker = new IMetadataLocker() {
@@ -1755,7 +1759,8 @@
}
if (stmtInsertUpsert.getReturnExpression() != null) {
- deliverResult(hcc, hdc, compiler, metadataProvider, locker,
resultDelivery, stats, clientContextId, ctx);
+ deliverResult(hcc, hdc, compiler, metadataProvider, locker,
resultDelivery, outMetadata, stats,
+ clientContextId, ctx);
} else {
locker.lock();
try {
@@ -1811,11 +1816,11 @@
// Query Rewriting (happens under the same ongoing metadata
transaction)
Pair<IReturningStatement, Integer> rewrittenResult =
- apiFramework.reWriteQuery(declaredFunctions, metadataProvider,
query, sessionConfig);
+ apiFramework.reWriteQuery(declaredFunctions, metadataProvider,
query, sessionOutput);
// Query Compilation (happens under the same ongoing metadata
transaction)
return apiFramework.compileQuery(clusterInfoCollector,
metadataProvider, (Query) rewrittenResult.first,
- rewrittenResult.second, stmt == null ? null :
stmt.getDatasetName(), sessionConfig, stmt);
+ rewrittenResult.second, stmt == null ? null :
stmt.getDatasetName(), sessionOutput, stmt);
}
private JobSpecification rewriteCompileInsertUpsert(IClusterInfoCollector
clusterInfoCollector,
@@ -1824,7 +1829,7 @@
// Insert/upsert statement rewriting (happens under the same ongoing
metadata transaction)
Pair<IReturningStatement, Integer> rewrittenResult =
- apiFramework.reWriteQuery(declaredFunctions, metadataProvider,
insertUpsert, sessionConfig);
+ apiFramework.reWriteQuery(declaredFunctions, metadataProvider,
insertUpsert, sessionOutput);
InsertStatement rewrittenInsertUpsert = (InsertStatement)
rewrittenResult.first;
String dataverseName =
getActiveDataverse(rewrittenInsertUpsert.getDataverseName());
@@ -1846,7 +1851,7 @@
}
// Insert/upsert statement compilation (happens under the same ongoing
metadata transaction)
return apiFramework.compileQuery(clusterInfoCollector,
metadataProvider, rewrittenInsertUpsert.getQuery(),
- rewrittenResult.second, datasetName, sessionConfig, clfrqs);
+ rewrittenResult.second, datasetName, sessionOutput, clfrqs);
}
protected void handleCreateFeedStatement(MetadataProvider
metadataProvider, Statement stmt) throws Exception {
@@ -2052,7 +2057,7 @@
datasets.add(ds);
}
org.apache.commons.lang3.tuple.Pair<JobSpecification,
AlgebricksAbsolutePartitionConstraint> jobInfo =
- FeedOperations.buildStartFeedJob(sessionConfig,
metadataProvider, feed, feedConnections,
+ FeedOperations.buildStartFeedJob(sessionOutput,
metadataProvider, feed, feedConnections,
compilationProvider, storageComponentProvider,
qtFactory, hcc);
JobSpecification feedJob = jobInfo.getLeft();
@@ -2287,8 +2292,8 @@
}
protected void handleQuery(MetadataProvider metadataProvider, Query query,
IHyracksClientConnection hcc,
- IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
String clientContextId,
- IStatementExecutorContext ctx) throws Exception {
+ IHyracksDataset hdc, ResultDelivery resultDelivery, ResultMetadata
outMetadata, Stats stats,
+ String clientContextId, IStatementExecutorContext ctx) throws
Exception {
final IMetadataLocker locker = new IMetadataLocker() {
@Override
public void lock() {
@@ -2318,12 +2323,12 @@
throw e;
}
};
- deliverResult(hcc, hdc, compiler, metadataProvider, locker,
resultDelivery, stats, clientContextId, ctx);
+ deliverResult(hcc, hdc, compiler, metadataProvider, locker,
resultDelivery, outMetadata, stats, clientContextId, ctx);
}
private void deliverResult(IHyracksClientConnection hcc, IHyracksDataset
hdc, IStatementCompiler compiler,
- MetadataProvider metadataProvider, IMetadataLocker locker,
ResultDelivery resultDelivery, Stats stats,
- String clientContextId, IStatementExecutorContext ctx) throws
Exception {
+ MetadataProvider metadataProvider, IMetadataLocker locker,
ResultDelivery resultDelivery, ResultMetadata outMetadata,
+ Stats stats, String clientContextId, IStatementExecutorContext
ctx) throws Exception {
final ResultSetId resultSetId = metadataProvider.getResultSetId();
switch (resultDelivery) {
case ASYNC:
@@ -2339,13 +2344,17 @@
case IMMEDIATE:
createAndRunJob(hcc, null, compiler, locker, resultDelivery,
id -> {
final ResultReader resultReader = new ResultReader(hdc,
id, resultSetId);
- ResultUtil.printResults(appCtx, resultReader,
sessionConfig, stats,
- metadataProvider.findOutputRecordType());
+ ARecordType recordType =
metadataProvider.findOutputRecordType();
+ ResultUtil.printResults(appCtx, resultReader,
sessionOutput, stats, recordType);
}, clientContextId, ctx);
break;
case DEFERRED:
createAndRunJob(hcc, null, compiler, locker, resultDelivery,
id -> {
- ResultUtil.printResultHandle(sessionConfig, new
ResultHandle(id, resultSetId));
+ ResultHandle resultHandle = new ResultHandle(id,
resultSetId);
+ ResultUtil.printResultHandle(sessionOutput, resultHandle);
+ if (outMetadata != null) {
+ outMetadata.getResultSets().add(Triple.of(id,
resultSetId, metadataProvider.findOutputRecordType()));
+ }
}, clientContextId, ctx);
break;
default:
@@ -2360,8 +2369,8 @@
try {
createAndRunJob(hcc, jobId, compiler, locker, resultDelivery, id
-> {
final ResultHandle handle = new ResultHandle(id, resultSetId);
- ResultUtil.printStatus(sessionConfig,
AbstractQueryApiServlet.ResultStatus.RUNNING);
- ResultUtil.printResultHandle(sessionConfig, handle);
+ ResultUtil.printStatus(sessionOutput,
AbstractQueryApiServlet.ResultStatus.RUNNING);
+ ResultUtil.printResultHandle(sessionOutput, handle);
synchronized (printed) {
printed.setTrue();
printed.notify();
@@ -2370,8 +2379,8 @@
} catch (Exception e) {
if (JobId.INVALID.equals(jobId.getValue())) {
// compilation failed
- ResultUtil.printStatus(sessionConfig,
AbstractQueryApiServlet.ResultStatus.FAILED);
- ResultUtil.printError(sessionConfig.out(), e);
+ ResultUtil.printStatus(sessionOutput,
AbstractQueryApiServlet.ResultStatus.FAILED);
+ ResultUtil.printError(sessionOutput.out(), e);
} else {
GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE,
resultDelivery.name() + " job with id " +
jobId.getValue() + " " + "failed", e);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 9c66f57..87a237d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -19,17 +19,19 @@
package org.apache.asterix.hyracks.bootstrap;
+import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
+import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
import static
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
import static
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.api.http.ctx.StatementExecutorContext;
import org.apache.asterix.api.http.server.ApiServlet;
import org.apache.asterix.api.http.server.ClusterApiServlet;
import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
@@ -73,6 +75,7 @@
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
@@ -95,6 +98,7 @@
protected ICCServiceContext ccServiceCtx;
protected CCExtensionManager ccExtensionManager;
protected IStorageComponentProvider componentProvider;
+ protected StatementExecutorContext statementExecutorCtx;
protected WebManager webManager;
protected CcApplicationContext appCtx;
private IJobCapacityController jobCapacityController;
@@ -124,9 +128,10 @@
ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
componentProvider = new StorageComponentProvider();
GlobalRecoveryManager.instantiate((HyracksConnection) getHcc(),
componentProvider);
+ statementExecutorCtx = new StatementExecutorContext();
appCtx = new CcApplicationContext(ccServiceCtx, getHcc(),
libraryManager, resourceIdManager,
() -> MetadataManager.INSTANCE,
GlobalRecoveryManager.instance(), ftStrategy,
- new ActiveLifecycleListener());
+ new ActiveLifecycleListener(), componentProvider);
ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
ccExtensionManager = new CCExtensionManager(getExtensions());
appCtx.setExtensionManager(ccExtensionManager);
@@ -184,7 +189,7 @@
IHyracksClientConnection hcc = getHcc();
webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] {
"/*" }, appCtx,
- ccExtensionManager.getAqlCompilationProvider(),
ccExtensionManager.getSqlppCompilationProvider(),
+ ccExtensionManager.getCompilationProvider(AQL),
ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider));
return webServer;
}
@@ -197,6 +202,8 @@
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
ccServiceCtx.getControllerService().getExecutor());
+ jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR,
statementExecutorCtx);
+ jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR,
ccServiceCtx);
// AQL rest APIs.
addServlet(jsonAPIServer, Servlets.AQL_QUERY);
@@ -241,28 +248,28 @@
protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String
key, String... paths) {
switch (key) {
case Servlets.AQL:
- return new FullApiServlet(ctx, paths, appCtx,
ccExtensionManager.getAqlCompilationProvider(),
+ return new FullApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.AQL_QUERY:
- return new QueryApiServlet(ctx, paths, appCtx,
ccExtensionManager.getAqlCompilationProvider(),
+ return new QueryApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.AQL_UPDATE:
- return new UpdateApiServlet(ctx, paths, appCtx,
ccExtensionManager.getAqlCompilationProvider(),
+ return new UpdateApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.AQL_DDL:
- return new DdlApiServlet(ctx, paths, appCtx,
ccExtensionManager.getAqlCompilationProvider(),
+ return new DdlApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(AQL),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP:
- return new FullApiServlet(ctx, paths, appCtx,
ccExtensionManager.getSqlppCompilationProvider(),
+ return new FullApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP_QUERY:
- return new QueryApiServlet(ctx, paths, appCtx,
ccExtensionManager.getSqlppCompilationProvider(),
+ return new QueryApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP_UPDATE:
- return new UpdateApiServlet(ctx, paths, appCtx,
ccExtensionManager.getSqlppCompilationProvider(),
+ return new UpdateApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.SQLPP_DDL:
- return new DdlApiServlet(ctx, paths, appCtx,
ccExtensionManager.getSqlppCompilationProvider(),
+ return new DdlApiServlet(ctx, paths, appCtx,
ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.RUNNING_REQUESTS:
return new QueryCancellationServlet(ctx, paths);
@@ -271,7 +278,7 @@
case Servlets.QUERY_RESULT:
return new QueryResultApiServlet(ctx, paths, appCtx);
case Servlets.QUERY_SERVICE:
- return new QueryServiceServlet(ctx, paths, appCtx,
ccExtensionManager.getSqlppCompilationProvider(),
+ return new QueryServiceServlet(ctx, paths, appCtx, SQLPP,
ccExtensionManager.getCompilationProvider(SQLPP),
getStatementExecutorFactory(), componentProvider);
case Servlets.CONNECTOR:
return new ConnectorApiServlet(ctx, paths, appCtx);
@@ -292,13 +299,17 @@
}
}
- private IStatementExecutorFactory getStatementExecutorFactory() {
+ public IStatementExecutorFactory getStatementExecutorFactory() {
return
ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor());
+ }
+
+ public IStatementExecutorContext getStatementExecutorContext() {
+ return statementExecutorCtx;
}
@Override
public void startupCompleted() throws Exception {
- ccServiceCtx.getControllerService().getExecutor().submit((Callable) ()
-> {
+ ccServiceCtx.getControllerService().getExecutor().submit(() -> {
ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
return null;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 9c24acf..c7b354e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -49,21 +49,26 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.BaseNCApplication;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.server.WebManager;
public class NCApplication extends BaseNCApplication {
private static final Logger LOGGER =
Logger.getLogger(NCApplication.class.getName());
- private INCServiceContext ncServiceCtx;
+ protected INCServiceContext ncServiceCtx;
private INcApplicationContext runtimeContext;
private String nodeId;
private boolean stopInitiated = false;
private SystemState systemState;
+ protected WebManager webManager;
@Override
public void registerConfig(IConfigManager configManager) {
@@ -122,6 +127,8 @@
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
}
+ webManager = new WebManager();
+
performLocalCleanUp();
}
@@ -129,6 +136,9 @@
protected void configureLoggingLevel(Level level) {
super.configureLoggingLevel(level);
Logger.getLogger("org.apache.asterix").setLevel(level);
+ }
+
+ protected void configureServers() throws Exception {
}
protected List<AsterixExtension> getExtensions() {
@@ -143,6 +153,9 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Stopping Asterix node controller: " + nodeId);
}
+
+ webManager.stop();
+
//Clean any temporary files
performLocalCleanUp();
@@ -163,6 +176,10 @@
@Override
public void startupCompleted() throws Exception {
+ // configure servlets after joining the cluster, so we can create
HyracksClientConnection
+ configureServers();
+ webManager.start();
+
// Since we don't pass initial run flag in
AsterixHyracksIntegrationUtil, we use the virtualNC flag
final NodeProperties nodeProperties =
runtimeContext.getNodeProperties();
if (systemState == SystemState.PERMANENT_DATA_LOSS
@@ -262,4 +279,10 @@
public INcApplicationContext getApplicationContext() {
return runtimeContext;
}
+
+ protected IHyracksClientConnection getHcc() throws Exception {
+ NodeControllerService ncSrv = (NodeControllerService)
ncServiceCtx.getControllerService();
+ ClusterControllerInfo ccInfo =
ncSrv.getNodeParameters().getClusterControllerInfo();
+ return new HyracksConnection(ccInfo.getClientNetAddress(),
ccInfo.getClientNetPort());
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 630aabe..5e9bcc0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -20,6 +20,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -44,6 +48,7 @@
private final LinkedBlockingQueue<INcAddressedMessage> receivedMsgsQ;
private final ConcurrentFramePool messagingFramePool;
private final int maxMsgSize;
+ private final Map<UUID, CompletableFuture<IMessage>> futureMap;
public NCMessageBroker(NodeControllerService ncs, MessagingProperties
messagingProperties) {
this.ncs = ncs;
@@ -53,6 +58,7 @@
messagingFramePool = new ConcurrentFramePool(ncs.getId(),
messagingMemoryBudget,
messagingProperties.getFrameSize());
receivedMsgsQ = new LinkedBlockingQueue<>();
+ futureMap = new HashMap<>();
MessageDeliveryService msgDeliverySvc = new MessageDeliveryService();
appContext.getThreadExecutor().execute(msgDeliverySvc);
}
@@ -104,6 +110,32 @@
ccb.getWriteInterface().getFullBufferAcceptor().accept(msgBuffer);
}
+ @Override
+ public CompletableFuture<IMessage> registerFuture(UUID msgId) {
+ if (msgId == null) {
+ throw new NullPointerException();
+ }
+
+ synchronized (futureMap) {
+ if (futureMap.containsKey(msgId)) {
+ throw new IllegalArgumentException(String.valueOf(msgId));
+ }
+ CompletableFuture<IMessage> future = new CompletableFuture<>();
+ futureMap.put(msgId, future);
+ return future;
+ }
+ }
+
+ @Override
+ public CompletableFuture<IMessage> deregisterFuture(UUID msgId) {
+ if (msgId == null) {
+ throw new NullPointerException();
+ }
+ synchronized (futureMap) {
+ return futureMap.remove(msgId);
+ }
+ }
+
private class MessageDeliveryService implements Runnable {
/*
* TODO Currently this thread is not stopped when it is interrupted
because
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index bffaeef..d1ff871 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -69,7 +69,7 @@
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.translator.CompiledStatements;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -151,7 +151,7 @@
return spec;
}
- private static JobSpecification getConnectionJob(SessionConfig
sessionConfig, MetadataProvider metadataProvider,
+ private static JobSpecification getConnectionJob(SessionOutput
sessionOutput, MetadataProvider metadataProvider,
FeedConnection feedConnection, String[] locations,
ILangCompilationProvider compilationProvider,
IStorageComponentProvider storageComponentProvider,
DefaultStatementExecutorFactory qtFactory,
IHyracksClientConnection hcc) throws AlgebricksException,
RemoteException, ACIDException {
@@ -165,7 +165,7 @@
statements.add(dataverseDecl);
statements.add(subscribeStmt);
IStatementExecutor translator =
qtFactory.create(metadataProvider.getApplicationContext(), statements,
- sessionConfig, compilationProvider, storageComponentProvider);
+ sessionOutput, compilationProvider, storageComponentProvider);
// configure the metadata provider
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" +
Boolean.TRUE);
metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME,
"" + subscribeStmt.getPolicy());
@@ -357,7 +357,7 @@
}
public static Pair<JobSpecification,
AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
- SessionConfig sessionConfig, MetadataProvider metadataProvider,
Feed feed,
+ SessionOutput sessionOutput, MetadataProvider metadataProvider,
Feed feed,
List<FeedConnection> feedConnections, ILangCompilationProvider
compilationProvider,
IStorageComponentProvider storageComponentProvider,
DefaultStatementExecutorFactory qtFactory,
IHyracksClientConnection hcc) throws Exception {
@@ -372,7 +372,7 @@
String[] ingestionLocations =
ingestionAdaptorFactory.getPartitionConstraint().getLocations();
// Add connection job
for (FeedConnection feedConnection : feedConnections) {
- JobSpecification connectionJob = getConnectionJob(sessionConfig,
metadataProvider, feedConnection,
+ JobSpecification connectionJob = getConnectionJob(sessionOutput,
metadataProvider, feedConnection,
ingestionLocations, compilationProvider,
storageComponentProvider, qtFactory, hcc);
jobsList.add(connectionJob);
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
index e2885b3..5e3da5f 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/aql/translator/QueryTranslatorTest.java
@@ -39,7 +39,7 @@
import org.apache.asterix.lang.common.statement.RunStatement;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionConfig;
+import org.apache.asterix.translator.SessionOutput;
import org.junit.Assert;
import org.junit.Test;
@@ -51,7 +51,7 @@
@Test
public void test() throws Exception {
List<Statement> statements = new ArrayList<>();
- SessionConfig mockSessionConfig = mock(SessionConfig.class);
+ SessionOutput mockSessionOutput = mock(SessionOutput.class);
RunStatement mockRunStatement = mock(RunStatement.class);
// Mocks AppContextInfo.
@@ -70,7 +70,7 @@
when(mockMasterNode.getClientIp()).thenReturn("127.0.0.1");
IStatementExecutor aqlTranslator = new
DefaultStatementExecutorFactory().create(mockAsterixAppContextInfo,
- statements, mockSessionConfig, new AqlCompilationProvider(),
new StorageComponentProvider());
+ statements, mockSessionOutput, new AqlCompilationProvider(),
new StorageComponentProvider());
List<String> parameters = new ArrayList<>();
parameters.add("examples/pregelix-example-jar-with-dependencies.jar");
parameters.add("org.apache.pregelix.example.PageRankVertex");
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 8457681..02b7d20 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -32,11 +32,13 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Inet4Address;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -117,17 +119,24 @@
/*
* Instance members
*/
- protected final String host;
- protected final int port;
+ protected final List<InetSocketAddress> endpoints;
+ protected int endpointSelector;
protected ITestLibrarian librarian;
-
- public TestExecutor(String host, int port) {
- this.host = host;
- this.port = port;
- }
public TestExecutor() {
this(Inet4Address.getLoopbackAddress().getHostAddress(), 19002);
+ }
+
+ public TestExecutor(String host, int port) {
+ this(InetSocketAddress.createUnresolved(host, port));
+ }
+
+ public TestExecutor(InetSocketAddress endpoint) {
+ this(Collections.singletonList(endpoint));
+ }
+
+ public TestExecutor(List<InetSocketAddress> endpoints) {
+ this.endpoints = endpoints;
}
public void setLibrarian(ITestLibrarian librarian) {
@@ -367,7 +376,7 @@
continue;
}
throw new Exception(
- "Result for " + scriptFile + ": expected pattern '" +
expression + "' not found in result.");
+ "Result for " + scriptFile + ": expected pattern '" +
expression + "' not found in result: "+actual);
}
} catch (Exception e) {
System.err.println("Actual results file: " +
actualFile.toString());
@@ -1122,7 +1131,7 @@
protected InputStream executeHttp(String ctxType, String endpoint,
OutputFormat fmt) throws Exception {
String[] split = endpoint.split("\\?");
- URI uri = new URI("http", null, host, port, split[0], split.length > 1
? split[1] : null, null);
+ URI uri = createEndpointURI(split[0], split.length > 1 ? split[1] :
null);
return executeURI(ctxType, uri, fmt);
}
@@ -1141,7 +1150,7 @@
//get node process id
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
String endpoint = "/admin/cluster/node/" + nodeId + "/config";
- InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null,
host, port, endpoint, null, null));
+ InputStream executeJSONGet = executeJSONGet(fmt,
createEndpointURI(endpoint, null));
StringWriter actual = new StringWriter();
IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
String config = actual.toString();
@@ -1158,7 +1167,7 @@
private void deleteNCTxnLogs(String nodeId, CompilationUnit cUnit) throws
Exception {
OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit);
String endpoint = "/admin/cluster/node/" + nodeId + "/config";
- InputStream executeJSONGet = executeJSONGet(fmt, new URI("http://" +
host + ":" + port + endpoint));
+ InputStream executeJSONGet = executeJSONGet(fmt,
createEndpointURI(endpoint, null));
StringWriter actual = new StringWriter();
IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8);
String config = actual.toString();
@@ -1237,8 +1246,16 @@
+ "_" + cUnit.getName() + "_qbc.adm");
}
+ protected URI createEndpointURI(String path, String query) throws
URISyntaxException {
+ int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+ InetSocketAddress endpoint = endpoints.get(endpointIdx);
+ URI uri = new URI("http", null, endpoint.getHostString(),
endpoint.getPort(), path, query, null);
+ LOGGER.fine("Created endpoint URI: " + uri);
+ return uri;
+ }
+
protected URI getEndpoint(String servlet) throws URISyntaxException {
- return new URI("http", null, host, port,
getPath(servlet).replaceAll("/\\*$", ""), null, null);
+ return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""),
null);
}
public static String stripJavaComments(String text) {
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
new file mode 100644
index 0000000..9a2ad3c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred/AsyncDeferredQueries.xml
@@ -0,0 +1,52 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-group name="async-deferred">
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-failed">
+ <output-dir compare="Text">async-failed</output-dir>
+ <expected-error>Injected failure in
asterix:inject-failure</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-compilation-failed">
+ <output-dir compare="Text">async-compilation-failed</output-dir>
+ <expected-error>Cannot find dataset gargel</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="deferred">
+ <output-dir compare="Text">deferred</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async">
+ <output-dir compare="Text">async</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-repeated">
+ <output-dir compare="Text">async-repeated</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="async-deferred">
+ <compilation-unit name="async-running">
+ <output-dir compare="Text">async-running</output-dir>
+ </compilation-unit>
+ </test-case>
+</test-group>
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index c2b0951..e9de123 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -18,43 +18,10 @@
!-->
<!DOCTYPE test-suite [
<!ENTITY RecordsQueries SYSTEM "queries_sqlpp/objects/ObjectsQueries.xml">
-
+ <!ENTITY AsyncDeferredQueries SYSTEM
"queries_sqlpp/async-deferred/AsyncDeferredQueries.xml">
]>
<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
QueryFileExtension=".sqlpp">
- <test-group name="async-deferred">
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-failed">
- <output-dir compare="Text">async-failed</output-dir>
- <expected-error>Injected failure in
asterix:inject-failure</expected-error>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-compilation-failed">
- <output-dir compare="Text">async-compilation-failed</output-dir>
- <expected-error>Cannot find dataset gargel</expected-error>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="deferred">
- <output-dir compare="Text">deferred</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async">
- <output-dir compare="Text">async</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-repeated">
- <output-dir compare="Text">async-repeated</output-dir>
- </compilation-unit>
- </test-case>
- <test-case FilePath="async-deferred">
- <compilation-unit name="async-running">
- <output-dir compare="Text">async-running</output-dir>
- </compilation-unit>
- </test-case>
- </test-group>
+ &AsyncDeferredQueries;
<test-group name="flwor">
<test-case FilePath="flwor">
<compilation-unit name="at00">
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index a9e6448..4bc73bd 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,6 +20,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -73,4 +74,8 @@
public IHyracksClientConnection getHcc();
public IResourceIdManager getResourceIdManager();
+
+ public IStorageComponentProvider getStorageComponentProvider();
+
+ public Object getExtensionManager();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index e1101b3..58dc48b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.common.messaging.api;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.messages.IMessageBroker;
public interface INCMessageBroker extends IMessageBroker {
@@ -26,7 +30,6 @@
* Sends application message from this NC to the CC.
*
* @param message
- * @param callback
* @throws Exception
*/
public void sendMessageToCC(ICcAddressedMessage message) throws Exception;
@@ -35,7 +38,6 @@
* Sends application message from this NC to another NC.
*
* @param message
- * @param callback
* @throws Exception
*/
public void sendMessageToNC(String nodeId, INcAddressedMessage message)
@@ -47,4 +49,21 @@
* @param msg
*/
public void queueReceivedMessage(INcAddressedMessage msg);
+
+ /**
+ * Creates and registers a Future for a message that will be send through
this broker
+ * @param msgId message id
+ * @return new Future
+ * @throws NullPointerException if {@code msdId} is {@code null}
+ * @throws IllegalArgumentException if there is already a Future
associated with given message id
+ */
+ CompletableFuture<IMessage> registerFuture(UUID msgId);
+
+ /**
+ * Removes a previously registered Future
+ * @param msgId message id
+ * @return existing Future or {@code null} if there was no Future
associated with this message id
+ * @throws NullPointerException if {@code msdId} is {@code null}
+ */
+ CompletableFuture<IMessage> deregisterFuture(UUID msgId);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 8608d68..a4c271c 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
+import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.library.ILibraryManager;
@@ -54,6 +55,7 @@
public class CcApplicationContext implements ICcApplicationContext {
private ICCServiceContext ccServiceCtx;
+ private IStorageComponentProvider storageComponentProvider;
private IGlobalRecoveryManager globalRecoveryManager;
private ILibraryManager libraryManager;
private IResourceIdManager resourceIdManager;
@@ -77,7 +79,8 @@
public CcApplicationContext(ICCServiceContext ccServiceCtx,
IHyracksClientConnection hcc,
ILibraryManager libraryManager, IResourceIdManager
resourceIdManager,
Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager,
- IFaultToleranceStrategy ftStrategy, IJobLifecycleListener
activeLifeCycleListener)
+ IFaultToleranceStrategy ftStrategy, IJobLifecycleListener
activeLifeCycleListener,
+ IStorageComponentProvider storageComponentProvider)
throws AsterixException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
@@ -102,6 +105,7 @@
this.nodeProperties = new NodeProperties(propertiesAccessor);
this.metadataBootstrapSupplier = metadataBootstrapSupplier;
this.globalRecoveryManager = globalRecoveryManager;
+ this.storageComponentProvider = storageComponentProvider;
}
@Override
@@ -174,6 +178,7 @@
return libraryManager;
}
+ @Override
public Object getExtensionManager() {
return extensionManager;
}
@@ -213,4 +218,9 @@
public IJobLifecycleListener getActiveLifecycleListener() {
return activeLifeCycleListener;
}
+
+ @Override
+ public IStorageComponentProvider getStorageComponentProvider() {
+ return storageComponentProvider;
+ }
}
diff --git
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
index f34dc6a..ebd945e 100644
---
a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
+++
b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/xml/TestSuiteParser.java
@@ -20,14 +20,29 @@
import java.io.File;
+import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import javax.xml.transform.sax.SAXSource;
+
+import org.xml.sax.InputSource;
public class TestSuiteParser {
public TestSuiteParser() {
}
public org.apache.asterix.testframework.xml.TestSuite parse(File
testSuiteCatalog) throws Exception {
+ SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
+ saxParserFactory.setNamespaceAware(true);
+ saxParserFactory.setXIncludeAware(true);
+ SAXParser saxParser = saxParserFactory.newSAXParser();
+ saxParser.setProperty(XMLConstants.ACCESS_EXTERNAL_DTD, "file");
+
JAXBContext ctx =
JAXBContext.newInstance(org.apache.asterix.testframework.xml.TestSuite.class);
- return (org.apache.asterix.testframework.xml.TestSuite)
ctx.createUnmarshaller().unmarshal(testSuiteCatalog);
+ Unmarshaller um = ctx.createUnmarshaller();
+ return (org.apache.asterix.testframework.xml.TestSuite)
um.unmarshal(new SAXSource(saxParser.getXMLReader(),
+ new InputSource(testSuiteCatalog.toURI().toString())));
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index cdfc492..3000a75 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.http.server;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -47,6 +48,8 @@
// Constants
private static final int LOW_WRITE_BUFFER_WATER_MARK = 8 * 1024;
private static final int HIGH_WRITE_BUFFER_WATER_MARK = 32 * 1024;
+ static final WriteBufferWaterMark WRITE_BUFFER_WATER_MARK =
+ new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK,
HIGH_WRITE_BUFFER_WATER_MARK);
private static final Logger LOGGER =
Logger.getLogger(HttpServer.class.getName());
private static final int FAILED = -1;
private static final int STOPPED = 0;
@@ -61,6 +64,7 @@
private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup;
private final int port;
+ private final InetSocketAddress proxyToAddress;
private final ExecutorService executor;
// Mutable members
private volatile int state = STOPPED;
@@ -73,9 +77,15 @@
public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
int port, int numExecutorThreads,
int requestQueueSize) {
+ this(bossGroup, workerGroup, port, numExecutorThreads,
requestQueueSize, null);
+ }
+
+ public HttpServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup,
int port, int numExecutorThreads,
+ int requestQueueSize, InetSocketAddress proxyToAddress) {
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.port = port;
+ this.proxyToAddress = proxyToAddress;
ctx = new ConcurrentHashMap<>();
servlets = new ArrayList<>();
executor = new ThreadPoolExecutor(numExecutorThreads,
numExecutorThreads, 0L, TimeUnit.MILLISECONDS,
@@ -192,8 +202,7 @@
Collections.sort(servlets, (l1, l2) -> l2.getPaths()[0].length() -
l1.getPaths()[0].length());
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
- .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
- new WriteBufferWaterMark(LOW_WRITE_BUFFER_WATER_MARK,
HIGH_WRITE_BUFFER_WATER_MARK))
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
WRITE_BUFFER_WATER_MARK)
.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new
HttpServerInitializer(this));
channel = b.bind(port).sync().channel();
}
@@ -225,7 +234,6 @@
}
}
}
- LOGGER.warning("No servlet for " + uri);
return null;
}
@@ -257,4 +265,12 @@
public ExecutorService getExecutor() {
return executor;
}
+
+ public EventLoopGroup getWorkerGroup() {
+ return workerGroup;
+ }
+
+ InetSocketAddress getProxyToAddress() {
+ return proxyToAddress;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
index 6f7ccba..de267ba 100644
---
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServerHandler.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.http.server;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -27,12 +28,20 @@
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.server.utils.HttpUtil;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpRequestEncoder;
+import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {
@@ -64,19 +73,33 @@
FullHttpRequest request = (FullHttpRequest) msg;
try {
IServlet servlet = server.getServlet(request);
- if (servlet == null) {
- respond(ctx, request, HttpResponseStatus.NOT_FOUND);
- } else {
+ if (servlet != null) {
submit(ctx, servlet, request);
+ return;
}
+
+ InetSocketAddress proxyToAddress = server.getProxyToAddress();
+ if (proxyToAddress != null) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Proxy " + request.uri() + " to " +
proxyToAddress);
+ }
+ proxy(ctx, request, proxyToAddress);
+ return;
+ }
+
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("No servlet for " + request.uri());
+ }
+ respond(ctx, request.protocolVersion(),
HttpResponseStatus.NOT_FOUND);
+
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failure Submitting HTTP Request", e);
- respond(ctx, request, HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ respond(ctx, request.protocolVersion(),
HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
- private void respond(ChannelHandlerContext ctx, FullHttpRequest request,
HttpResponseStatus status) {
- DefaultHttpResponse response = new
DefaultHttpResponse(request.protocolVersion(), status);
+ private void respond(ChannelHandlerContext ctx, HttpVersion httpVersion,
HttpResponseStatus status) {
+ DefaultHttpResponse response = new DefaultHttpResponse(httpVersion,
status);
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
@@ -86,7 +109,7 @@
servletRequest = HttpUtil.toServletRequest(request);
} catch (IllegalArgumentException e) {
LOGGER.log(Level.WARNING, "Failure Decoding Request", e);
- respond(ctx, request, HttpResponseStatus.BAD_REQUEST);
+ respond(ctx, request.protocolVersion(),
HttpResponseStatus.BAD_REQUEST);
return;
}
handler = new HttpRequestHandler(ctx, servlet, servletRequest,
chunkSize);
@@ -102,6 +125,52 @@
}
}
+ private void proxy(ChannelHandlerContext ctx, FullHttpRequest request,
InetSocketAddress toAddress) {
+ HttpVersion requestHttpVersion = request.protocolVersion();
+ Channel clientChannel = ctx.channel();
+
+ Bootstrap b = new Bootstrap();
+ b.group(server.getWorkerGroup());
+ b.channel(clientChannel.getClass());
+ b.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
HttpServer.WRITE_BUFFER_WATER_MARK);
+ b.option(ChannelOption.AUTO_READ, false);
+ b.handler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel proxyChannel) throws Exception {
+ ChannelPipeline p = proxyChannel.pipeline();
+ p.addLast(new HttpRequestEncoder());
+ p.addLast(new HttpResponseDecoder());
+ p.addLast(new ProxyHandler(clientChannel));
+ }
+ });
+
+ request.retain();
+
+ b.connect(toAddress).addListener((ChannelFutureListener)
proxyConnectFuture -> {
+ if (proxyConnectFuture.isSuccess()) {
+ proxyConnectFuture.channel().writeAndFlush(request)
+ .addListener((ChannelFutureListener) proxyWriteFuture
-> {
+ Channel proxyChannel = proxyWriteFuture.channel();
+ if (proxyWriteFuture.isSuccess()) {
+ proxyChannel.read();
+ } else {
+ proxyChannel.close();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("[Proxy] Failure writing to "
+ toAddress);
+ }
+ respond(ctx, requestHttpVersion,
HttpResponseStatus.NOT_FOUND);
+ }
+ });
+ } else {
+ request.release();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("[Proxy] Failure connecting to " + toAddress);
+ }
+ respond(ctx, requestHttpVersion, HttpResponseStatus.NOT_FOUND);
+ }
+ });
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.log(Level.SEVERE, "Failure handling HTTP Request", cause);
diff --git
a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
new file mode 100644
index 0000000..6abe6a7
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/ProxyHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.http.server;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+final class ProxyHandler extends ChannelInboundHandlerAdapter {
+
+ private static final Logger LOGGER =
Logger.getLogger(ProxyHandler.class.getName());
+
+ private final Channel clientChannel;
+
+ ProxyHandler(Channel clientChannel) {
+ this.clientChannel = clientChannel;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext proxyCtx, Object msg) throws
Exception {
+ clientChannel.writeAndFlush(msg).addListener((ChannelFutureListener)
clientWriteFuture -> {
+ if (clientWriteFuture.isSuccess()) {
+ proxyCtx.channel().read();
+ } else {
+ clientChannel.close();
+ }
+ });
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext proxyCtx) throws
Exception {
+ closeClientChannel();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext proxyCtx, Throwable
cause) throws Exception {
+ LOGGER.log(Level.FINE, "[Proxy] Failure in proxy channel", cause);
+ proxyCtx.channel().close();
+ closeClientChannel();
+ }
+
+ private void closeClientChannel() {
+ if (clientChannel.isActive()) {
+
clientChannel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1709
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I19414a23e163fc4deef9805c8f9089609f1ebe07
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>