Till Westmann has submitted this change and it was merged. Change subject: Deferred result retrieval for the QueryService ......................................................................
Deferred result retrieval for the QueryService And some cleanup: - fewer JSONExceptions on interfaces - rename ResultDelivery options (existing HTTP APIs still work) SYNC -> IMMEDIATE ASYNC_DEFERRED -> DEFERRED ASYNC -> ASYNC - shorten variables queryMetadataProvider -> metadataProvider aqlStatements -> statements compiled -> jobSpec - rename ResultUtil.displayResults to printResults Change-Id: I72d53be824d8dbf1d9f547b01f19097d0dc18add Reviewed-on: https://asterix-gerrit.ics.uci.edu/1373 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java 15 files changed, 228 insertions(+), 117 deletions(-) Approvals: Ian Maxon: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 7783121..a1f3055 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -29,7 +29,6 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.job.JobSpecification; -import org.json.JSONException; /** * An interface that takes care of executing a list of statements that are submitted through an Asterix API @@ -39,19 +38,19 @@ /** * Specifies result delivery of executed statements */ - public enum ResultDelivery { + enum ResultDelivery { /** - * Wait for results to be read + * Results are returned with the first response */ - SYNC, + IMMEDIATE, /** - * Flush out result handle beofre waiting for the result + * Results are produced completely, but only a result handle is returned */ - ASYNC, + DEFERRED, /** - * Return result handle and don't wait for the result + * A result handle is returned before the resutlts are complete */ - ASYNC_DEFERRED + ASYNC } public static class Stats { @@ -116,17 +115,14 @@ * @param dmlStatement * The data modification statement when the query results in a modification to a dataset * @return the compiled {@code JobSpecification} - * @param returnQuery - * In the case of dml, the user may run a query on affected data * @throws AsterixException * @throws RemoteException * @throws AlgebricksException - * @throws JSONException * @throws ACIDException */ JobSpecification rewriteCompileQuery(MetadataProvider metadataProvider, Query query, ICompiledDmlStatement dmlStatement) - throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException; + throws AsterixException, RemoteException, AlgebricksException, ACIDException; /** * returns the active dataverse for an entity or a statement diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java index fc4f655..328f714 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/SessionConfig.java @@ -123,6 +123,8 @@ private final ResultDecorator preResultDecorator; private final ResultDecorator postResultDecorator; + private final ResultDecorator preHandleDecorator; + private final ResultDecorator postHandleDecorator; // Flags. private final Map<String, Boolean> flags; @@ -141,17 +143,19 @@ * Output format for execution output. */ public SessionConfig(PrintWriter out, OutputFormat fmt) { - this(out, fmt, null, null, true, true, true); + this(out, fmt, null, null, null, null, true, true, true); } public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator, - ResultDecorator postResultDecorator) { - this(out, fmt, preResultDecorator, postResultDecorator, true, true, true); + ResultDecorator postResultDecorator, ResultDecorator preHandleDecorator, + ResultDecorator postHandleDecorator) { + this(out, fmt, preResultDecorator, postResultDecorator, preHandleDecorator, postHandleDecorator, true, true, + true); } public SessionConfig(PrintWriter out, OutputFormat fmt, boolean optimize, boolean executeQuery, boolean generateJobSpec) { - this(out, fmt, null, null, optimize, executeQuery, generateJobSpec); + this(out, fmt, null, null, null, null, optimize, executeQuery, generateJobSpec); } /** @@ -172,11 +176,14 @@ * false, job cannot be executed). */ public SessionConfig(PrintWriter out, OutputFormat fmt, ResultDecorator preResultDecorator, - ResultDecorator postResultDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec) { + ResultDecorator postResultDecorator, ResultDecorator preHandleDecorator, + ResultDecorator postHandleDecorator, boolean optimize, boolean executeQuery, boolean generateJobSpec) { this.out = out; this.fmt = fmt; this.preResultDecorator = preResultDecorator; this.postResultDecorator = postResultDecorator; + this.preHandleDecorator = preHandleDecorator; + this.postHandleDecorator = postHandleDecorator; this.optimize = optimize; this.executeQuery = executeQuery; this.generateJobSpec = generateJobSpec; @@ -199,12 +206,19 @@ public AlgebricksAppendable resultPrefix(AlgebricksAppendable app) throws AlgebricksException { return this.preResultDecorator != null ? this.preResultDecorator.append(app) : app; - }; + } public AlgebricksAppendable resultPostfix(AlgebricksAppendable app) throws AlgebricksException { return this.postResultDecorator != null ? this.postResultDecorator.append(app) : app; - }; + } + public AlgebricksAppendable handlePrefix(AlgebricksAppendable app) throws AlgebricksException { + return this.preHandleDecorator != null ? this.preHandleDecorator.append(app) : app; + } + + public AlgebricksAppendable handlePostfix(AlgebricksAppendable app) throws AlgebricksException { + return this.postHandleDecorator != null ? this.postHandleDecorator.append(app) : app; + } /** * Retrieve the value of the "execute query" flag. */ diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index e08b3db..0d8df9e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -34,11 +34,11 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.compiler.provider.IRuleSetFactory; -import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer; import org.apache.asterix.dataflow.data.common.AqlMergeAggregationExpressionFactory; import org.apache.asterix.dataflow.data.common.AqlMissableTypeComputer; import org.apache.asterix.dataflow.data.common.AqlPartialAggregationTypeComputer; import org.apache.asterix.dataflow.data.common.ConflictingTypeResolver; +import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer; import org.apache.asterix.formats.base.IDataFormat; import org.apache.asterix.jobgen.QueryLogicalExpressionJobGen; import org.apache.asterix.lang.common.base.IAstPrintVisitorFactory; @@ -151,9 +151,9 @@ } public JobSpecification compileQuery(List<FunctionDecl> declaredFunctions, - MetadataProvider queryMetadataProvider, Query rwQ, int varCounter, String outputDatasetName, + MetadataProvider metadataProvider, Query rwQ, int varCounter, String outputDatasetName, SessionConfig conf, ICompiledDmlStatement statement) - throws AlgebricksException, JSONException, RemoteException, ACIDException { + throws AlgebricksException, RemoteException, ACIDException { if (!conf.is(SessionConfig.FORMAT_ONLY_PHYSICAL_OPS) && conf.is(SessionConfig.OOB_REWRITTEN_EXPR_TREE)) { conf.out().println(); @@ -166,9 +166,9 @@ } org.apache.asterix.common.transactions.JobId asterixJobId = JobIdFactory.generateJobId(); - queryMetadataProvider.setJobId(asterixJobId); + metadataProvider.setJobId(asterixJobId); ILangExpressionToPlanTranslator t = - translatorFactory.createExpressionToPlanTranslator(queryMetadataProvider, varCounter); + translatorFactory.createExpressionToPlanTranslator(metadataProvider, varCounter); ILogicalPlan plan; // statement = null when it's a query @@ -211,7 +211,7 @@ builder.setPhysicalOptimizationConfig(OptimizationConfUtil.getPhysicalOptimizationConfig()); builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites()); builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites()); - IDataFormat format = queryMetadataProvider.getFormat(); + IDataFormat format = metadataProvider.getFormat(); ICompilerFactory compilerFactory = builder.create(); builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer()); builder.setIMergeAggregationExpressionFactory(new AqlMergeAggregationExpressionFactory()); @@ -219,9 +219,9 @@ builder.setExpressionTypeComputer(ExpressionTypeComputer.INSTANCE); builder.setMissableTypeComputer(AqlMissableTypeComputer.INSTANCE); builder.setConflictingTypeResolver(ConflictingTypeResolver.INSTANCE); - builder.setClusterLocations(queryMetadataProvider.getClusterLocations()); + builder.setClusterLocations(metadataProvider.getClusterLocations()); - ICompiler compiler = compilerFactory.createCompiler(plan, queryMetadataProvider, t.getVarCounter()); + ICompiler compiler = compilerFactory.createCompiler(plan, metadataProvider, t.getVarCounter()); if (conf.isOptimize()) { compiler.optimize(); //plot optimized logical plan @@ -247,7 +247,7 @@ try { LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor(); PlanPrettyPrinter.printPlan(plan, pvisitor, 0); - ResultUtil.displayResults(pvisitor.get().toString(), conf, new Stats(), null); + ResultUtil.printResults(pvisitor.get().toString(), conf, new Stats(), null); return null; } catch (IOException e) { throw new AlgebricksException(e); @@ -291,13 +291,17 @@ builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider()); JobEventListenerFactory jobEventListenerFactory = - new JobEventListenerFactory(asterixJobId, queryMetadataProvider.isWriteTransaction()); + new JobEventListenerFactory(asterixJobId, metadataProvider.isWriteTransaction()); JobSpecification spec = compiler.createJob(AsterixAppContextInfo.INSTANCE, jobEventListenerFactory); if (conf.is(SessionConfig.OOB_HYRACKS_JOB)) { printPlanPrefix(conf, "Hyracks job"); if (rwQ != null) { - conf.out().println(spec.toJSON().toString(1)); + try { + conf.out().println(spec.toJSON().toString(1)); + } catch (JSONException e) { + throw new AlgebricksException(e); + } conf.out().println(spec.getUserConstraints()); } printPlanPostfix(conf); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java index 4e9bb25..b693407 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/APIServlet.java @@ -129,7 +129,7 @@ statementExectorFactory.create(aqlStatements, sessionConfig, compilationProvider); double duration = 0; long startTime = System.currentTimeMillis(); - translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.SYNC); + translator.compileAndExecute(hcc, hds, IStatementExecutor.ResultDelivery.IMMEDIATE); long endTime = System.currentTimeMillis(); duration = (endTime - startTime) / 1000.00; out.println(HTML_STATEMENT_SEPARATOR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java index a572500..9994bc7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryResultAPIServlet.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.PrintWriter; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.servlet.ServletContext; import javax.servlet.http.HttpServlet; @@ -31,6 +33,7 @@ import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.result.ResultUtil; +import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.SessionConfig; import org.apache.hyracks.api.client.HyracksConnection; @@ -45,9 +48,12 @@ public class QueryResultAPIServlet extends HttpServlet { private static final long serialVersionUID = 1L; + private static final Logger LOGGER = Logger.getLogger(QueryResultAPIServlet.class.getName()); + @Override public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { - response.setContentType("text/html"); + int respCode = HttpServletResponse.SC_OK; + response.setContentType("text/html"); // TODO this seems wrong ... response.setCharacterEncoding("utf-8"); String strHandle = request.getParameter("handle"); PrintWriter out = response.getWriter(); @@ -56,6 +62,10 @@ IHyracksDataset hds; try { + if (strHandle == null || strHandle.isEmpty()) { + throw new AsterixException("Empty request, no handle provided"); + } + HyracksProperties hp = new HyracksProperties(); String strIP = hp.getHyracksIPAddress(); int port = hp.getHyracksPort(); @@ -88,11 +98,16 @@ // originally determined there. Need to save this value on // some object that we can obtain here. SessionConfig sessionConfig = RESTAPIServlet.initResponse(request, response); - ResultUtil.displayResults(resultReader, sessionConfig, new Stats(), null); + ResultUtil.printResults(resultReader, sessionConfig, new Stats(), null); } catch (Exception e) { + respCode = HttpServletResponse.SC_BAD_REQUEST; out.println(e.getMessage()); - e.printStackTrace(out); + LOGGER.log(Level.WARNING, "Error retrieving result", e); + } + response.setStatus(respCode); + if (out.checkError()) { + LOGGER.warning("Error flushing output writer"); } } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java index 941c7f7..9da518a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java @@ -81,7 +81,8 @@ STATEMENT("statement"), FORMAT("format"), CLIENT_ID("client_context_id"), - PRETTY("pretty"); + PRETTY("pretty"), + MODE("mode"); private final String str; @@ -132,6 +133,7 @@ TYPE("type"), STATUS("status"), RESULTS("results"), + HANDLE("handle"), ERRORS("errors"), METRICS("metrics"); @@ -229,6 +231,7 @@ String format; boolean pretty; String clientContextID; + String mode; @Override public String toString() { @@ -242,6 +245,7 @@ sb.append("\", "); sb.append("\"format\": \"").append(format).append("\", "); sb.append("\"pretty\": ").append(pretty).append(", "); + sb.append("\"mode\": ").append(mode).append(", "); sb.append("\"clientContextID\": \"").append(clientContextID).append("\" "); sb.append('}'); return sb; @@ -279,7 +283,7 @@ return SessionConfig.OutputFormat.ADM; } if (format.startsWith(MediaType.JSON.str())) { - return Boolean.parseBoolean(getParameterValue(format, "lossless")) + return Boolean.parseBoolean(getParameterValue(format, Attribute.LOSSLESS.str())) ? SessionConfig.OutputFormat.LOSSLESS_JSON : SessionConfig.OutputFormat.CLEAN_JSON; } } @@ -302,19 +306,22 @@ } }; - SessionConfig.ResultDecorator resultPostfix = (AlgebricksAppendable app) -> { - app.append("\t,\n"); - return app; - }; + SessionConfig.ResultDecorator resultPostfix = (AlgebricksAppendable app) -> app.append("\t,\n"); + + SessionConfig.ResultDecorator handlePrefix = (AlgebricksAppendable app) -> app.append("\t\"").append + (ResultFields.HANDLE.str()).append("\": "); + + SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(",\n"); SessionConfig.OutputFormat format = getFormat(param.format); - SessionConfig sessionConfig = new SessionConfig(resultWriter, format, resultPrefix, resultPostfix); + SessionConfig sessionConfig = new SessionConfig(resultWriter, format, resultPrefix, resultPostfix, + handlePrefix, handlePostfix); sessionConfig.set(SessionConfig.FORMAT_WRAPPER_ARRAY, true); sessionConfig.set(SessionConfig.FORMAT_INDENT_JSON, param.pretty); sessionConfig.set(SessionConfig.FORMAT_QUOTE_RECORD, format != SessionConfig.OutputFormat.CLEAN_JSON && format != SessionConfig.OutputFormat.LOSSLESS_JSON); sessionConfig.set(SessionConfig.FORMAT_CSV_HEADER, format == SessionConfig.OutputFormat.CSV - && "present".equals(getParameterValue(param.format, "header"))); + && "present".equals(getParameterValue(param.format, Attribute.HEADER.str()))); return sessionConfig; } @@ -440,6 +447,7 @@ param.statement = jsonRequest.get(Parameter.STATEMENT.str()).asText(); param.format = toLower(getOptText(jsonRequest, Parameter.FORMAT.str())); param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false); + param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str())); param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str()); } catch (JsonParseException | JsonMappingException e) { // if the JSON parsing fails, the statement is empty and we get an empty statement error @@ -452,6 +460,7 @@ } param.format = toLower(request.getParameter(Parameter.FORMAT.str())); param.pretty = Boolean.parseBoolean(request.getParameter(Parameter.PRETTY.str())); + param.mode = toLower(request.getParameter(Parameter.MODE.str())); param.clientContextID = request.getParameter(Parameter.CLIENT_ID.str()); } return param; @@ -463,11 +472,23 @@ return sw.toString(); } + private static QueryTranslator.ResultDelivery parseResultDelivery(String mode) { + if ("async".equals(mode)) { + return QueryTranslator.ResultDelivery.ASYNC; + } else if ("deferred".equals(mode)) { + return QueryTranslator.ResultDelivery.DEFERRED; + } else { + return QueryTranslator.ResultDelivery.IMMEDIATE; + } + } + private void handleRequest(RequestParameters param, HttpServletResponse response) throws IOException { LOGGER.info(param.toString()); long elapsedStart = System.nanoTime(); final StringWriter stringWriter = new StringWriter(); final PrintWriter resultWriter = new PrintWriter(stringWriter); + + QueryTranslator.ResultDelivery delivery = parseResultDelivery(param.mode); SessionConfig sessionConfig = createSessionConfig(param, resultWriter); response.setCharacterEncoding("utf-8"); @@ -504,12 +525,12 @@ } } IParser parser = compilationProvider.getParserFactory().createParser(param.statement); - List<Statement> aqlStatements = parser.parse(); + List<Statement> statements = parser.parse(); MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(aqlStatements, sessionConfig, + IStatementExecutor translator = statementExecutorFactory.create(statements, sessionConfig, compilationProvider); execStart = System.nanoTime(); - translator.compileAndExecute(hcc, hds, QueryTranslator.ResultDelivery.SYNC, stats); + translator.compileAndExecute(hcc, hds, delivery, stats); execEnd = System.nanoTime(); printStatus(resultWriter, ResultStatus.SUCCESS); } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java index 558be35..4a06590 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/RESTAPIServlet.java @@ -49,6 +49,7 @@ import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionConfig.OutputFormat; import org.apache.commons.io.IOUtils; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.client.dataset.HyracksDataset; @@ -108,7 +109,12 @@ format = OutputFormat.LOSSLESS_JSON; } - SessionConfig sessionConfig = new SessionConfig(response.getWriter(), format); + SessionConfig.ResultDecorator handlePrefix = (AlgebricksAppendable app) -> app.append("{ \"").append("handle") + .append("\": "); + SessionConfig.ResultDecorator handlePostfix = (AlgebricksAppendable app) -> app.append(" }"); + + SessionConfig sessionConfig = new SessionConfig(response.getWriter(), format, null, null, handlePrefix, + handlePostfix); // If it's JSON or ADM, check for the "wrapper-array" flag. Default is // "true" for JSON and "false" for ADM. (Not applicable for CSV.) @@ -228,13 +234,13 @@ protected QueryTranslator.ResultDelivery whichResultDelivery(HttpServletRequest request) { String mode = request.getParameter("mode"); if (mode != null) { - if (mode.equals("asynchronous")) { + if ("asynchronous".equals(mode) || "async".equals(mode)) { return QueryTranslator.ResultDelivery.ASYNC; - } else if (mode.equals("asynchronous-deferred")) { - return QueryTranslator.ResultDelivery.ASYNC_DEFERRED; + } else if ("asynchronous-deferred".equals(mode) || "deferred".equals(mode)) { + return QueryTranslator.ResultDelivery.DEFERRED; } } - return QueryTranslator.ResultDelivery.SYNC; + return QueryTranslator.ResultDelivery.IMMEDIATE; } protected abstract String getQueryParameter(HttpServletRequest request); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java index fd9c6cd..3d240f8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java @@ -95,7 +95,7 @@ } IStatementExecutor translator = statementExecutorFactory.create(statements, conf, compilationProvider); - translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.SYNC); + translator.compileAndExecute(hcc, null, QueryTranslator.ResultDelivery.IMMEDIATE); writer.flush(); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java index 414585a..3bdf353 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedWorkCollection.java @@ -93,7 +93,7 @@ statements.add(subscribeStmt); IStatementExecutor translator = qtFactory.create(statements, pc, compilationProvider); translator.compileAndExecute(AsterixAppContextInfo.INSTANCE.getHcc(), null, - QueryTranslator.ResultDelivery.SYNC); + QueryTranslator.ResultDelivery.IMMEDIATE); if (LOGGER.isEnabledFor(Level.INFO)) { LOGGER.info("Submitted connection requests for execution: " + request); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java new file mode 100644 index 0000000..05eb967 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultHandle.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.result; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; +import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.job.JobId; + +public class ResultHandle { + private long jobId; + private long resultSetId; + + public ResultHandle(JobId jobId, ResultSetId resultSetId) { + this.jobId = jobId.getId(); + this.resultSetId = resultSetId.getId(); + } + + public AlgebricksAppendable append(AlgebricksAppendable app) throws AlgebricksException { + return app.append("[").append(String.valueOf(jobId)).append(", ").append(String.valueOf(resultSetId)) + .append("]"); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java index 22d2c23..22034c3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java @@ -128,7 +128,7 @@ } } - private void displayRecord(String result) { + private void printRecord(String result) { String record = result; if (indentJSON) { // TODO(tillw): this is inefficient - do this during record generation @@ -152,7 +152,7 @@ printPrefix(); // TODO(tillw) evil hack quoteRecord = true; - displayRecord(record); + printRecord(record); printPostfix(); } @@ -179,7 +179,7 @@ conf.out().print(", "); } notFirst = true; - displayRecord(result); + printRecord(result); } frameBuffer.clear(); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java index f68f458..595c6ab 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java @@ -39,6 +39,7 @@ import org.apache.asterix.translator.SessionConfig; import org.apache.http.ParseException; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.log4j.Logger; import org.json.JSONArray; @@ -72,16 +73,27 @@ return escaped; } - public static void displayResults(ResultReader resultReader, SessionConfig conf, Stats stats, + public static void printResults(ResultReader resultReader, SessionConfig conf, Stats stats, ARecordType recordType) throws HyracksDataException { new ResultPrinter(conf, stats, recordType).print(resultReader); } - public static void displayResults(String record, SessionConfig conf, Stats stats, ARecordType recordType) + public static void printResults(String record, SessionConfig conf, Stats stats, ARecordType recordType) throws HyracksDataException { new ResultPrinter(conf, stats, recordType).print(record); } + public static void printResultHandle(ResultHandle handle, SessionConfig conf) throws HyracksDataException { + try { + final AlgebricksAppendable app = new AlgebricksAppendable(conf.out()); + conf.handlePrefix(app); + handle.append(app); + conf.handlePostfix(app); + } catch (AlgebricksException e) { + throw new HyracksDataException(e); + } + } + public static JSONObject getErrorResponse(int errorCode, String errorMessage, String errorSummary, String errorStackTrace) { JSONObject errorResp = new JSONObject(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index cf6e49d..d5d5e53 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -50,6 +50,7 @@ import org.apache.asterix.app.external.ExternalIndexingOperations; import org.apache.asterix.app.external.FeedJoint; import org.apache.asterix.app.external.FeedOperations; +import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.app.result.ResultUtil; import org.apache.asterix.common.config.AsterixExternalProperties; @@ -202,9 +203,6 @@ import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; import com.google.common.collect.Lists; @@ -344,7 +342,7 @@ if (((InsertStatement) stmt).getReturnQuery() != null) { metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC - || resultDelivery == ResultDelivery.ASYNC_DEFERRED); + || resultDelivery == ResultDelivery.DEFERRED); } handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false); break; @@ -376,7 +374,7 @@ case Statement.Kind.QUERY: metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC - || resultDelivery == ResultDelivery.ASYNC_DEFERRED); + || resultDelivery == ResultDelivery.DEFERRED); handleQuery(metadataProvider, (Query) stmt, hcc, hdc, resultDelivery, stats); break; case Statement.Kind.COMPACT: @@ -1881,7 +1879,6 @@ MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName, dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets()); - JobSpecification compiled = null; try { metadataProvider.setWriteTransaction(true); CompiledInsertStatement clfrqs = null; @@ -1899,19 +1896,19 @@ default: throw new AlgebricksException("Unsupported statement type " + stmtInsertUpsert.getKind()); } - compiled = rewriteCompileQuery(metadataProvider, query, clfrqs); + JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, query, clfrqs); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - if (compiled != null && !compileOnly) { + if (jobSpec != null && !compileOnly) { if (stmtInsertUpsert.getReturnQuery() != null) { - handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats); + handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats); } else { - JobUtils.runJob(hcc, compiled, true); + JobUtils.runJob(hcc, jobSpec, true); } } - + return jobSpec; } catch (Exception e) { if (bActiveTxn) { abort(e, e, mdTxnCtx); @@ -1922,7 +1919,6 @@ dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets()); } - return compiled; } public void handleDeleteStatement(MetadataProvider metadataProvider, Statement stmt, @@ -1942,13 +1938,13 @@ CompiledDeleteStatement clfrqs = new CompiledDeleteStatement(stmtDelete.getVariableExpr(), dataverseName, stmtDelete.getDatasetName().getValue(), stmtDelete.getCondition(), stmtDelete.getVarCounter(), stmtDelete.getQuery()); - JobSpecification compiled = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); + JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, clfrqs.getQuery(), clfrqs); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - if (compiled != null) { - JobUtils.runJob(hcc, compiled, true); + if (jobSpec != null) { + JobUtils.runJob(hcc, jobSpec, true); } } catch (Exception e) { @@ -1966,7 +1962,7 @@ @Override public JobSpecification rewriteCompileQuery(MetadataProvider metadataProvider, Query query, ICompiledDmlStatement stmt) - throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException { + throws AsterixException, RemoteException, AlgebricksException, ACIDException { // Query Rewriting (happens under the same ongoing metadata transaction) Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, @@ -2421,7 +2417,7 @@ metadataProvider.getConfig().put(FeedActivityDetails.COLLECT_LOCATIONS, StringUtils.join(bfs.getLocations(), ',')); - JobSpecification compiled = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs); + JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, bfs.getQuery(), csfs); FeedConnectionId feedConnectionId = new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), bfs.getSubscriptionRequest().getTargetDataset()); String dataverse = feedConnectionId.getFeedId().getDataverse(); @@ -2429,7 +2425,7 @@ MetadataLockManager.INSTANCE.subscribeFeedBegin(dataverse, dataverse + "." + dataset, dataverse + "." + feedConnectionId.getFeedId().getEntityName()); try { - JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(compiled, feedConnectionId, + JobSpecification alteredJobSpec = FeedMetadataUtil.alterJobSpecificationForFeed(jobSpec, feedConnectionId, bfs.getSubscriptionRequest().getPolicyParameters()); FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, bfs.getPolicy()); if (policy == null) { @@ -2441,7 +2437,7 @@ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; - if (compiled != null) { + if (jobSpec != null) { FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId()); FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(), @@ -2563,19 +2559,19 @@ boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets()); - JobSpecification compiled; try { - compiled = rewriteCompileQuery(metadataProvider, query, null); + JobSpecification jobSpec = rewriteCompileQuery(metadataProvider, query, null); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (query.isExplain()) { sessionConfig.out().flush(); - return compiled; - } else if (sessionConfig.isExecuteQuery() && compiled != null) { - handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats); + return jobSpec; + } else if (sessionConfig.isExecuteQuery() && jobSpec != null) { + handleQueryResult(metadataProvider, hcc, hdc, jobSpec, resultDelivery, stats); } + return jobSpec; } catch (Exception e) { LOGGER.log(Level.INFO, e.getMessage(), e); if (bActiveTxn) { @@ -2587,46 +2583,33 @@ // release external datasets' locks acquired during compilation of the query ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider); } - return compiled; } private void handleQueryResult(MetadataProvider metadataProvider, IHyracksClientConnection hcc, - IHyracksDataset hdc, JobSpecification compiled, ResultDelivery resultDelivery, Stats stats) + IHyracksDataset hdc, JobSpecification jobSpec, ResultDelivery resultDelivery, Stats stats) throws Exception { if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) { - GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1)); + GlobalConfig.ASTERIX_LOGGER.fine(jobSpec.toJSON().toString(1)); } - JobId jobId = JobUtils.runJob(hcc, compiled, false); + JobId jobId = JobUtils.runJob(hcc, jobSpec, false); - JSONObject response = new JSONObject(); switch (resultDelivery) { case ASYNC: - JSONArray handle = new JSONArray(); - handle.put(jobId.getId()); - handle.put(metadataProvider.getResultSetId().getId()); - response.put("handle", handle); - sessionConfig.out().print(response); - sessionConfig.out().flush(); + ResultUtil.printResultHandle(new ResultHandle(jobId, metadataProvider.getResultSetId()), sessionConfig); hcc.waitForCompletion(jobId); break; - case SYNC: + case IMMEDIATE: hcc.waitForCompletion(jobId); ResultReader resultReader = new ResultReader(hdc); resultReader.open(jobId, metadataProvider.getResultSetId()); - ResultUtil.displayResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType()); + ResultUtil.printResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType()); break; - case ASYNC_DEFERRED: - handle = new JSONArray(); - handle.put(jobId.getId()); - handle.put(metadataProvider.getResultSetId().getId()); - response.put("handle", handle); + case DEFERRED: hcc.waitForCompletion(jobId); - sessionConfig.out().print(response); - sessionConfig.out().flush(); + ResultUtil.printResultHandle(new ResultHandle(jobId, metadataProvider.getResultSetId()), sessionConfig); break; default: break; - } } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java index d99602f..662888e 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/ResultExtractor.java @@ -39,7 +39,8 @@ private static final Logger LOGGER = Logger.getLogger(ResultExtractor.class.getName()); public static InputStream extract(InputStream resultStream) throws Exception { - String result = IOUtils.toString(resultStream, Charset.forName("UTF-8")); + final Charset utf8 = Charset.forName("UTF-8"); + String result = IOUtils.toString(resultStream, utf8); LOGGER.fine("+++++++\n" + result + "\n+++++++\n"); @@ -78,7 +79,17 @@ if (! "success".equals(status)) { throw new Exception("Unexpected status: '" + status + "'"); } - return IOUtils.toInputStream(results); + return IOUtils.toInputStream(results, utf8); + } + + public static String extractHandle(InputStream resultStream) throws Exception { + final Charset utf8 = Charset.forName("UTF-8"); + String result = IOUtils.toString(resultStream, utf8); + JSONObject parsed = new JSONObject(result); + JSONArray handle = parsed.getJSONArray("handle"); + JSONObject res = new JSONObject(); + res.put("handle", handle); + return res.toString(); } private static String getFieldName(JSONTokener tokener) throws JSONException { diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java index 2da57e3..bfe0e33 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java @@ -430,7 +430,7 @@ public InputStream executeQueryService(String str, OutputFormat fmt, String url, List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception { - setFormatParam(params, fmt); + setParam(params, "format", fmt.mimeType()); HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, url, "statement", params) : constructPostMethodUrl(str, url, "statement", params); // Set accepted output response type @@ -439,16 +439,24 @@ return response.getEntity().getContent(); } - protected void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) { + public InputStream executeQueryService(String statement, OutputFormat fmt, String url, + List<CompilationUnit.Parameter> params, boolean jsonEncoded, String deferred) throws Exception { + setParam(params, "mode", deferred); + InputStream resultStream = executeQueryService(statement, fmt, url, params, jsonEncoded); + String handle = ResultExtractor.extractHandle(resultStream); + return getHandleResult(handle, fmt); + } + + protected void setParam(List<CompilationUnit.Parameter> params, String name, String value) { for (CompilationUnit.Parameter param : params) { - if ("format".equals(param.getName())) { - param.setValue(fmt.mimeType()); + if (name.equals(param.getName())) { + param.setValue(value); return; } } CompilationUnit.Parameter formatParam = new CompilationUnit.Parameter(); - formatParam.setName("format"); - formatParam.setValue(fmt.mimeType()); + formatParam.setName(name); + formatParam.setValue(value); params.add(formatParam); } @@ -781,14 +789,16 @@ resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Servlets.AQL)); } } else { - if (ctx.getType().equalsIgnoreCase("query")) { - resultStream = executeQueryService(statement, fmt, getEndpoint(Servlets.QUERY_SERVICE), - cUnit.getParameter(), true); + final String reqType = ctx.getType(); + final String url = getEndpoint(Servlets.QUERY_SERVICE); + final List<CompilationUnit.Parameter> params = cUnit.getParameter(); + if (reqType.equalsIgnoreCase("query")) { + resultStream = executeQueryService(statement, fmt, url, params, true); resultStream = ResultExtractor.extract(resultStream); - } else if (ctx.getType().equalsIgnoreCase("async")) { - resultStream = executeAnyAQLAsync(statement, false, fmt, getEndpoint(Servlets.SQLPP)); - } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) { - resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Servlets.SQLPP)); + } else if (reqType.equalsIgnoreCase("async")) { + resultStream = executeQueryService(statement, fmt, url, params, true, "async"); + } else if (reqType.equalsIgnoreCase("asyncdefer")) { + resultStream = executeQueryService(statement, fmt, url, params, true, "deferred"); } } if (queryCount.intValue() >= expectedResultFileCtxs.size()) { -- To view, visit https://asterix-gerrit.ics.uci.edu/1373 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I72d53be824d8dbf1d9f547b01f19097d0dc18add Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Till Westmann <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
