Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2518][RT] Introduce Request Tracker ......................................................................
[ASTERIXDB-2518][RT] Introduce Request Tracker - user model changes: no - storage format changes: no - interface changes: yes Details: - Introduce IReceptionist to generate request references. - Track all requests by uuid. - Add more information to active_requests response. - Replace StatementExecutorContext by RequestTracker. - Deprecate StatementExecutorContext (to be removed) - Allow extensions to set optional parameters in query service. - Return forbidden when a cancellation is attempt on a request that is not cancellable. Change-Id: If08ecd91c55881743b2ecf40a628fa3d4166c554 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3163 Reviewed-by: Till Westmann <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-algebra/pom.xml M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java D asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java A asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java A asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java D asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java A asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java 41 files changed, 847 insertions(+), 271 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found; Violations found diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml index 369d93b..ee0bb5e 100644 --- a/asterixdb/asterix-algebra/pom.xml +++ b/asterixdb/asterix-algebra/pom.xml @@ -246,5 +246,13 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-http</artifactId> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index ec44d60..50e6cc2 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -19,6 +19,8 @@ package org.apache.asterix.translator; import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.api.RequestReference; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.om.base.ADateTime; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -27,14 +29,14 @@ import com.fasterxml.jackson.databind.node.ObjectNode; public abstract class BaseClientRequest implements IClientRequest { - protected final IStatementExecutorContext ctx; - protected final long requestTime = System.currentTimeMillis(); - protected final String contextId; - private boolean complete; - public BaseClientRequest(IStatementExecutorContext ctx, String contextId) { - this.ctx = ctx; - this.contextId = contextId; + private boolean complete; + private final IRequestReference requestReference; + private boolean cancellable = false; + protected volatile String state = "received"; + + public BaseClientRequest(IRequestReference requestReference) { + this.requestReference = requestReference; } @Override @@ -43,7 +45,6 @@ return; } complete = true; - ctx.remove(contextId); } @Override @@ -52,24 +53,55 @@ return; } complete(); - doCancel(appCtx); + if (cancellable) { + doCancel(appCtx); + } + } + + @Override + public synchronized void markCancellable() { + cancellable = true; + } + + @Override + public String getId() { + // the uuid is generated by the node which received the request + // so there is a chance this might not be unique now + return requestReference.getUuid(); + } + + @Override + public synchronized boolean isCancellable() { + return cancellable; + } + + public void setRunning() { + state = "running"; } @Override public String toJson() { - try { - return JSONUtil.convertNode(asJson()); - } catch (Exception e) { - throw new IllegalStateException(e); - } + return JSONUtil.convertNodeOrThrow(asJson()); } protected ObjectNode asJson() { ObjectNode json = JSONUtil.createObject(); - json.put("requestTime", new ADateTime(requestTime).toSimpleString()); - json.put("clientContextID", contextId); + json.put("uuid", requestReference.getUuid()); + json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString()); + json.put("elapsedTime", getElapsedTime()); + json.put("node", requestReference.getNode()); + json.put("state", state); + json.put("userAgent", ((RequestReference) requestReference).getUserAgent()); + json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr()); + json.put("cancellable", cancellable); return json; } + private String getElapsedTime() { + // this is just an estimation as the request might have been received on a node with a different system time + // TODO add dynamic time unit + return System.currentTimeMillis() - requestReference.getTime() + "ms"; + } + protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java deleted file mode 100644 index 81714ca..0000000 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.translator; - -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.util.JSONUtil; - -import com.fasterxml.jackson.databind.node.ObjectNode; - -public class ClientJobRequest extends BaseClientRequest { - private final JobId jobId; - - public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, JobId jobId) { - super(ctx, clientCtxId); - this.jobId = jobId; - } - - @Override - protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException { - IHyracksClientConnection hcc = appCtx.getHcc(); - try { - hcc.cancelJob(jobId); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - ctx.remove(contextId); - } - - @Override - public String toJson() { - final ObjectNode jsonNode = super.asJson(); - jsonNode.put("jobId", jobId.toString()); - try { - return JSONUtil.convertNode(jsonNode); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } -} diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java new file mode 100644 index 0000000..014bf3c --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.translator; + +import java.util.Map; + +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class ClientRequest extends BaseClientRequest { + + protected String statement; + protected JobId jobId; + protected Thread executor; + protected String clientContextId; + + public ClientRequest(IRequestReference requestReference, String clientContextId, String statement, + Map<String, String> optionalParameters) { + super(requestReference); + this.clientContextId = clientContextId; + this.statement = statement; + this.executor = Thread.currentThread(); + } + + @Override + public String getClientContextId() { + return clientContextId; + } + + public synchronized void setJobId(JobId jobId) { + this.jobId = jobId; + setRunning(); + } + + public Thread getExecutor() { + return executor; + } + + @Override + protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException { + // if the request has a job, we abort the job and do not interrupt the thread as it will be notified + // that the job has been cancelled. Otherwise, we interrupt the thread + if (jobId != null) { + IHyracksClientConnection hcc = appCtx.getHcc(); + try { + hcc.cancelJob(jobId); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } else if (executor != null) { + executor.interrupt(); + } + } + + @Override + protected ObjectNode asJson() { + ObjectNode json = super.asJson(); + json.put("jobId", jobId.toString()); + json.put("statement", statement); + json.put("clientContextID", clientContextId); + return json; + } +} diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java index 58f0997..86ba301 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IRequestParameters.java @@ -20,6 +20,7 @@ import java.util.Map; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.om.base.IAObject; import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.hyracks.api.result.IResultSet; @@ -67,4 +68,18 @@ * @return true if the request accepts multiple statements. Otherwise, false. */ boolean isMultiStatement(); + + /** + * Gets the statement the client provided with the request + * + * @return the request statement + */ + String getStatement(); + + /** + * The request reference of this {@link IRequestParameters} + * + * @return the request reference + */ + IRequestReference getRequestReference(); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 9bc86da..93eed8a 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -110,12 +110,10 @@ * Compiles and executes a list of statements * * @param hcc - * @param ctx * @param requestParameters * @throws Exception */ - void compileAndExecute(IHyracksClientConnection hcc, IStatementExecutorContext ctx, - IRequestParameters requestParameters) throws Exception; + void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception; /** * rewrites and compiles query into a hyracks job specifications diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java index 29e7bda..9648036 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java @@ -24,8 +24,9 @@ import org.apache.asterix.common.api.IClientRequest; /** - * The context for statement executors. Maintains ongoing user requests. + * @deprecated (use IRequestTracker) */ +@Deprecated public interface IStatementExecutorContext { /** diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java new file mode 100644 index 0000000..8d06143 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.translator; + +import java.util.Map; +import java.util.UUID; + +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.api.IReceptionist; +import org.apache.asterix.common.api.IRequestReference; +import org.apache.asterix.common.api.RequestReference; +import org.apache.http.HttpHeaders; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.http.api.IServletRequest; + +public class Receptionist implements IReceptionist { + + private final String node; + + public Receptionist(String node) { + this.node = node; + } + + @Override + public IRequestReference welcome(IServletRequest request) { + final String uuid = UUID.randomUUID().toString(); + final RequestReference ref = RequestReference.of(uuid, node, System.currentTimeMillis()); + ref.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT)); + //TODO set remote address + return ref; + } + + @Override + public IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement, + Map<String, String> optionalParameters) throws HyracksDataException { + return new ClientRequest(requestRef, clientContextId, statement, optionalParameters); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java deleted file mode 100644 index 136fda7..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.api.http.ctx; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.asterix.common.api.IClientRequest; -import org.apache.asterix.translator.IStatementExecutorContext; - -public class StatementExecutorContext implements IStatementExecutorContext { - - private final Map<String, IClientRequest> runningQueries = new ConcurrentHashMap<>(); - - @Override - public IClientRequest get(String clientContextId) { - return runningQueries.get(clientContextId); - } - - @Override - public void put(String clientContextId, IClientRequest req) { - runningQueries.put(clientContextId, req); - } - - @Override - public IClientRequest remove(String clientContextId) { - return runningQueries.remove(clientContextId); - } - - @Override - public Map<String, IClientRequest> getRunningRequests() { - return Collections.unmodifiableMap(runningQueries); - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java index 9844900..b23fa1e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java @@ -120,10 +120,8 @@ return hcc; } - protected static UUID printRequestId(PrintWriter pw) { - UUID requestId = UUID.randomUUID(); - ResultUtil.printField(pw, ResultFields.REQUEST_ID.str(), requestId.toString()); - return requestId; + protected static void printRequestId(PrintWriter pw, String requestId) { + ResultUtil.printField(pw, ResultFields.REQUEST_ID.str(), requestId); } protected static void printHandle(PrintWriter pw, String handle, boolean comma) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index d206336..ee3eb9c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -33,6 +33,7 @@ import javax.imageio.ImageIO; import org.apache.asterix.app.translator.RequestParameters; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -89,6 +90,7 @@ @Override protected void post(IServletRequest request, IServletResponse response) { + final IRequestReference requestReference = appCtx.getReceptionist().welcome(request); // Query language ILangCompilationProvider compilationProvider = "AQL".equals(request.getParameter("query-language")) ? aqlCompilationProvider : sqlppCompilationProvider; @@ -149,10 +151,10 @@ compilationProvider, componentProvider); double duration; long startTime = System.currentTimeMillis(); - final IRequestParameters requestParameters = - new RequestParameters(resultSet, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), - new IStatementExecutor.Stats(), null, null, null, null, true); - translator.compileAndExecute(hcc, null, requestParameters); + final IRequestParameters requestParameters = new RequestParameters(requestReference, query, resultSet, + new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(), + null, null, null, null, true); + translator.compileAndExecute(hcc, requestParameters); long endTime = System.currentTimeMillis(); duration = (endTime - startTime) / 1000.00; out.println(HTML_STATEMENT_SEPARATOR); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java index 5f5692d..d260a0b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java @@ -23,8 +23,8 @@ import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter; import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.apache.hyracks.http.server.AbstractServlet; @@ -54,17 +54,20 @@ response.setStatus(HttpResponseStatus.BAD_REQUEST); return; } - IStatementExecutorContext executorCtx = - (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR); - IClientRequest req = executorCtx.get(clientContextId); + final IRequestTracker requestTracker = appCtx.getRequestTracker(); + final IClientRequest req = requestTracker.getByClientContextId(clientContextId); if (req == null) { // response: NOT FOUND response.setStatus(HttpResponseStatus.NOT_FOUND); return; } + if (!req.isCancellable()) { + response.setStatus(HttpResponseStatus.FORBIDDEN); + return; + } try { // Cancels the on-going job. - req.cancel(appCtx); + requestTracker.cancel(req.getId()); // response: OK response.setStatus(HttpResponseStatus.OK); } catch (Exception e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 362f924..55f3369 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -20,7 +20,6 @@ package org.apache.asterix.api.http.server; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -33,6 +32,7 @@ import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.Duration; import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.ExceptionUtils; @@ -69,10 +69,10 @@ } @Override - protected void executeStatement(String statementsText, SessionOutput sessionOutput, - ResultProperties resultProperties, IStatementExecutor.Stats stats, QueryServiceRequestParameters param, - RequestExecutionState execution, Map<String, String> optionalParameters, - Map<String, byte[]> statementParameters) throws Exception { + protected void executeStatement(IRequestReference requestReference, String statementsText, + SessionOutput sessionOutput, ResultProperties resultProperties, IStatementExecutor.Stats stats, + QueryServiceRequestParameters param, RequestExecutionState execution, + Map<String, String> optionalParameters, Map<String, byte[]> statementParameters) throws Exception { // Running on NC -> send 'execute' message to CC INCServiceContext ncCtx = (INCServiceContext) serviceCtx; INCMessageBroker ncMb = (INCMessageBroker) ncCtx.getMessageBroker(); @@ -81,9 +81,6 @@ MessageFuture responseFuture = ncMb.registerMessageFuture(); final String handleUrl = getHandleUrl(param.getHost(), param.getPath(), delivery); try { - if (param.getClientContextID() == null) { - param.setClientContextID(UUID.randomUUID().toString()); - } long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; if (param.getTimeout() != null && !param.getTimeout().trim().isEmpty()) { timeout = TimeUnit.NANOSECONDS.toMillis(Duration.parseDurationStringToNanos(param.getTimeout())); @@ -91,7 +88,7 @@ ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), resultProperties.getNcToCcResultProperties(), param.getClientContextID(), handleUrl, - optionalParameters, statementParameters, param.isMultiStatement()); + optionalParameters, statementParameters, param.isMultiStatement(), requestReference); execution.start(); ncMb.sendMessageToPrimaryCC(requestMsg); try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 99df372..a34f6b4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -43,6 +43,8 @@ import org.apache.asterix.common.api.Duration; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.api.IReceptionist; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -64,7 +66,6 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; -import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionConfig; @@ -93,7 +94,7 @@ private final ILangCompilationProvider compilationProvider; private final IStatementExecutorFactory statementExecutorFactory; private final IStorageComponentProvider componentProvider; - private final IStatementExecutorContext queryCtx; + private final IReceptionist receptionist; protected final IServiceContext serviceCtx; protected final Function<IServletRequest, Map<String, String>> optionalParamProvider; protected String hostName; @@ -107,7 +108,7 @@ this.compilationProvider = compilationProvider; this.statementExecutorFactory = statementExecutorFactory; this.componentProvider = componentProvider; - this.queryCtx = (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR); + receptionist = appCtx.getReceptionist(); this.serviceCtx = (IServiceContext) ctx.get(ServletConstants.SERVICE_CONTEXT_ATTR); this.optionalParamProvider = optionalParamProvider; try { @@ -345,7 +346,7 @@ pw.print("\t}\n"); } - private String getOptText(JsonNode node, String fieldName) { + protected String getOptText(JsonNode node, String fieldName) { final JsonNode value = node.get(fieldName); return value != null ? value.asText() : null; } @@ -397,7 +398,7 @@ String contentType = HttpUtil.getContentTypeOnly(request); if (HttpUtil.ContentType.APPLICATION_JSON.equals(contentType)) { try { - setParamFromJSON(request, param); + setParamFromJSON(request, param, optionalParams); } catch (JsonParseException | JsonMappingException e) { // if the JSON parsing fails, the statement is empty and we get an empty statement error GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, e.getMessage(), e); @@ -407,7 +408,8 @@ } } - private void setParamFromJSON(IServletRequest request, QueryServiceRequestParameters param) throws IOException { + private void setParamFromJSON(IServletRequest request, QueryServiceRequestParameters param, + Map<String, String> optionalParameters) throws IOException { JsonNode jsonRequest = OBJECT_MAPPER.readTree(HttpUtil.getRequestBody(request)); param.setFormat(toLower(getOptText(jsonRequest, Parameter.FORMAT.str()))); param.setPretty(getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false)); @@ -430,6 +432,11 @@ if (jsonRequest.has(statementParam)) { param.setStatement(jsonRequest.get(statementParam).asText()); } + setJsonOptionalParameters(jsonRequest, optionalParameters); + } + + protected void setJsonOptionalParameters(JsonNode jsonRequest, Map<String, String> optionalParameters) { + // allows extensions to set extra parameters } private void setParamFromRequest(IServletRequest request, QueryServiceRequestParameters param) throws IOException { @@ -503,6 +510,7 @@ } private void handleRequest(IServletRequest request, IServletResponse response) { + final IRequestReference requestRef = receptionist.welcome(request); long elapsedStart = System.nanoTime(); long errorCount = 1; Stats stats = new Stats(); @@ -527,7 +535,7 @@ final ResultProperties resultProperties = param.getMaxResultReads() == null ? new ResultProperties(delivery) : new ResultProperties(delivery, Long.parseLong(param.getMaxResultReads())); printAdditionalResultFields(sessionOutput.out()); - printRequestId(sessionOutput.out()); + printRequestId(sessionOutput.out(), requestRef.getUuid()); printClientContextID(sessionOutput.out(), param); if (!param.isParseOnly()) { printSignature(sessionOutput.out(), param); @@ -544,10 +552,9 @@ } else { Map<String, byte[]> statementParams = org.apache.asterix.app.translator.RequestParameters .serializeParameterValues(param.getStatementParams()); - setAccessControlHeaders(request, response); response.setStatus(execution.getHttpStatus()); - executeStatement(statementsText, sessionOutput, resultProperties, stats, param, execution, + executeStatement(requestRef, statementsText, sessionOutput, resultProperties, stats, param, execution, optionalParams, statementParams); if (ResultDelivery.IMMEDIATE == delivery || ResultDelivery.DEFERRED == delivery) { ResultUtil.printStatus(sessionOutput, execution.getResultStatus()); @@ -594,10 +601,10 @@ return parseOnlyResult; } - protected void executeStatement(String statementsText, SessionOutput sessionOutput, - ResultProperties resultProperties, Stats stats, QueryServiceRequestParameters param, - RequestExecutionState execution, Map<String, String> optionalParameters, - Map<String, byte[]> statementParameters) throws Exception { + protected void executeStatement(IRequestReference requestReference, String statementsText, + SessionOutput sessionOutput, ResultProperties resultProperties, Stats stats, + QueryServiceRequestParameters param, RequestExecutionState execution, + Map<String, String> optionalParameters, Map<String, byte[]> statementParameters) throws Exception { IClusterManagementWork.ClusterState clusterState = ((ICcApplicationContext) appCtx).getClusterStateManager().getState(); if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) { @@ -612,10 +619,10 @@ execution.start(); Map<String, IAObject> stmtParams = org.apache.asterix.app.translator.RequestParameters.deserializeParameterValues(statementParameters); - IRequestParameters requestParameters = - new org.apache.asterix.app.translator.RequestParameters(getResultSet(), resultProperties, stats, null, - param.getClientContextID(), optionalParameters, stmtParams, param.isMultiStatement()); - translator.compileAndExecute(getHyracksClientConnection(), queryCtx, requestParameters); + IRequestParameters requestParameters = new org.apache.asterix.app.translator.RequestParameters(requestReference, + statementsText, getResultSet(), resultProperties, stats, null, param.getClientContextID(), + optionalParameters, stmtParams, param.isMultiStatement()); + translator.compileAndExecute(getHyracksClientConnection(), requestParameters); execution.end(); printExecutionPlans(sessionOutput, translator.getExecutionPlans()); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java index 0aa2211..520f85a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java @@ -26,6 +26,7 @@ import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.app.translator.RequestParameters; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -176,16 +177,16 @@ response.setHeader("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept"); SessionOutput sessionOutput = initResponse(request, response); QueryTranslator.ResultDelivery resultDelivery = whichResultDelivery(request); - doHandle(response, query, sessionOutput, resultDelivery); + final IRequestReference requestReference = appCtx.getReceptionist().welcome(request); + doHandle(requestReference, response, query, sessionOutput, resultDelivery); } catch (Exception e) { response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); LOGGER.log(Level.WARN, "Failure handling request", e); - return; } } - private void doHandle(IServletResponse response, String query, SessionOutput sessionOutput, - ResultDelivery resultDelivery) throws JsonProcessingException { + private void doHandle(IRequestReference requestReference, IServletResponse response, String query, + SessionOutput sessionOutput, ResultDelivery resultDelivery) throws JsonProcessingException { try { response.setStatus(HttpResponseStatus.OK); IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR); @@ -196,9 +197,9 @@ IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, sessionOutput, compilationProvider, componentProvider); final IResultSet resultSet = ServletUtil.getResultSet(hcc, appCtx, ctx); - final IRequestParameters requestParameters = new RequestParameters(resultSet, + final IRequestParameters requestParameters = new RequestParameters(requestReference, query, resultSet, new ResultProperties(resultDelivery), new IStatementExecutor.Stats(), null, null, null, null, true); - translator.compileAndExecute(hcc, null, requestParameters); + translator.compileAndExecute(hcc, requestParameters); } catch (AsterixException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); GlobalConfig.ASTERIX_LOGGER.log(Level.ERROR, pe.getMessage(), pe); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java index 71b4b81..a5c8645 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/java/AsterixJavaClient.java @@ -22,9 +22,11 @@ import java.io.Reader; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.asterix.api.common.APIFramework; import org.apache.asterix.app.translator.RequestParameters; +import org.apache.asterix.common.api.RequestReference; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.utils.Job; @@ -113,7 +115,8 @@ while ((ch = queryText.read()) != -1) { builder.append((char) ch); } - IParser parser = parserFactory.createParser(builder.toString()); + String statement = builder.toString(); + IParser parser = parserFactory.createParser(statement); List<Statement> statements = parser.parse(); MetadataManager.INSTANCE.init(); @@ -126,10 +129,12 @@ IStatementExecutor translator = statementExecutorFactory.create(appCtx, statements, output, compilationProvider, storageComponentProvider); - final IRequestParameters requestParameters = - new RequestParameters(null, new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), - new IStatementExecutor.Stats(), null, null, null, statementParams, true); - translator.compileAndExecute(hcc, null, requestParameters); + final RequestReference requestReference = + RequestReference.of(UUID.randomUUID().toString(), "CC", System.currentTimeMillis()); + final IRequestParameters requestParameters = new RequestParameters(requestReference, statement, null, + new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(), null, + null, null, statementParams, true); + translator.compileAndExecute(hcc, requestParameters); writer.flush(); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java index 9d15131..3ae18a7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ActiveRequestsRequest.java @@ -46,10 +46,7 @@ @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); - CCApplication application = (CCApplication) ccs.getApplication(); - IStatementExecutorContext executorsCtx = application.getStatementExecutorContext(); - final Collection<IClientRequest> runningRequests = executorsCtx.getRunningRequests().values(); + final Collection<IClientRequest> runningRequests = appCtx.getRequestTracker().getRunningRequests(); final String[] requests = runningRequests.stream().map(IClientRequest::toJson).toArray(String[]::new); ActiveRequestsResponse response = new ActiveRequestsResponse(reqId, requests); CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java index 943aad3..e4cef7f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -19,14 +19,12 @@ package org.apache.asterix.app.message; import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.utils.RequestStatus; -import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.messaging.CCMessageBroker; -import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,23 +45,24 @@ @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); - CCApplication application = (CCApplication) ccs.getApplication(); - IStatementExecutorContext executorsCtx = application.getStatementExecutorContext(); - IClientRequest req = executorsCtx.get(contextId); + final IRequestTracker requestTracker = appCtx.getRequestTracker(); + IClientRequest req = requestTracker.getByClientContextId(contextId); RequestStatus status; if (req == null) { LOGGER.log(Level.WARN, "No job found for context id " + contextId); status = RequestStatus.NOT_FOUND; } else { - try { - req.cancel(appCtx); - executorsCtx.remove(contextId); - status = RequestStatus.SUCCESS; - } catch (Exception e) { - LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); - status = RequestStatus.FAILED; + if (!req.isCancellable()) { + status = RequestStatus.REJECTED; + } else { + try { + requestTracker.cancel(req.getId()); + status = RequestStatus.SUCCESS; + } catch (Exception e) { + LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); + status = RequestStatus.FAILED; + } } } CancelQueryResponse response = new CancelQueryResponse(reqId, status); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index 94d63a4..fdc4432 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -30,6 +30,7 @@ import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.translator.RequestParameters; import org.apache.asterix.common.api.IClusterManagementWork; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.context.IStorageComponentProvider; @@ -80,11 +81,12 @@ private final Map<String, String> optionalParameters; private final Map<String, byte[]> statementParameters; private final boolean multiStatement; + private final IRequestReference requestReference; public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, String statementsText, SessionConfig sessionConfig, ResultProperties resultProperties, String clientContextID, String handleUrl, Map<String, String> optionalParameters, - Map<String, byte[]> statementParameters, boolean multiStatement) { + Map<String, byte[]> statementParameters, boolean multiStatement, IRequestReference requestReference) { this.requestNodeId = requestNodeId; this.requestMessageId = requestMessageId; this.lang = lang; @@ -96,6 +98,7 @@ this.optionalParameters = optionalParameters; this.statementParameters = statementParameters; this.multiStatement = multiStatement; + this.requestReference = requestReference; } @Override @@ -113,7 +116,6 @@ ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang); IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider(); IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory(); - IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext(); ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); try { IParser parser = compilationProvider.getParserFactory().createParser(statementsText); @@ -132,9 +134,10 @@ compilationProvider, storageComponentProvider); final IStatementExecutor.Stats stats = new IStatementExecutor.Stats(); Map<String, IAObject> stmtParams = RequestParameters.deserializeParameterValues(statementParameters); - final IRequestParameters requestParameters = new RequestParameters(null, resultProperties, stats, - outMetadata, clientContextID, optionalParameters, stmtParams, multiStatement); - translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters); + final IRequestParameters requestParameters = + new RequestParameters(requestReference, statementsText, null, resultProperties, stats, outMetadata, + clientContextID, optionalParameters, stmtParams, multiStatement); + translator.compileAndExecute(ccApp.getHcc(), requestParameters); outPrinter.close(); responseMsg.setResult(outWriter.toString()); responseMsg.setMetadata(outMetadata); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 3e72b7d..724691c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -36,6 +36,8 @@ import org.apache.asterix.common.api.IDatasetMemoryManager; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.IPropertiesFactory; +import org.apache.asterix.common.api.IReceptionist; +import org.apache.asterix.common.api.IReceptionistFactory; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ActiveProperties; import org.apache.asterix.common.config.AsterixExtension; @@ -146,6 +148,7 @@ private IHyracksClientConnection hcc; private IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private IReplicaManager replicaManager; + private IReceptionist receptionist; public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions, IPropertiesFactory propertiesFactory) throws AsterixException, InstantiationException, @@ -175,7 +178,8 @@ } @Override - public void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun) throws IOException { + public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory, + boolean initialRun) throws IOException { ioManager = getServiceContext().getIoManager(); threadExecutor = MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory()); @@ -215,6 +219,7 @@ activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(), activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(), this.ncServiceContext); + receptionist = receptionistFactory.create(); if (replicationProperties.isReplicationEnabled()) { replicationManager = new ReplicationManager(this, replicationProperties); @@ -533,4 +538,9 @@ public IPersistedResourceRegistry getPersistedResourceRegistry() { return persistedResourceRegistry; } + + @Override + public IReceptionist getReceptionist() { + return receptionist; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index b0f3287..2a1ed4f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -52,7 +52,9 @@ import org.apache.asterix.app.active.FeedEventsListener; import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; @@ -150,7 +152,7 @@ import org.apache.asterix.om.types.TypeSignature; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; -import org.apache.asterix.translator.ClientJobRequest; +import org.apache.asterix.translator.ClientRequest; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; @@ -160,7 +162,6 @@ import org.apache.asterix.translator.ExecutionPlansHtmlPrintUtil; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; -import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.NoOpStatementExecutorContext; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; @@ -261,30 +262,24 @@ } @Override - public void compileAndExecute(IHyracksClientConnection hcc, IStatementExecutorContext ctx, - IRequestParameters requestParameters) throws Exception { + public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { if (!requestParameters.isMultiStatement()) { validateStatements(statements); } + trackRequest(requestParameters); int resultSetIdCounter = 0; FileSplit outputFile = null; IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE; IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE; - /* - * Since the system runs a large number of threads, when HTTP requests don't - * return, it becomes difficult to find the thread running the request to - * determine where it has stopped. Setting the thread name helps make that - * easier - */ String threadName = Thread.currentThread().getName(); - Thread.currentThread().setName(QueryTranslator.class.getSimpleName()); + Thread.currentThread().setName( + QueryTranslator.class.getSimpleName() + ":" + requestParameters.getRequestReference().getUuid()); Map<String, String> config = new HashMap<>(); final IResultSet resultSet = requestParameters.getResultSet(); final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); final long maxResultReads = requestParameters.getResultProperties().getMaxReads(); final Stats stats = requestParameters.getStats(); final ResultMetadata outMetadata = requestParameters.getOutMetadata(); - final String clientContextId = requestParameters.getClientContextId(); final Map<String, IAObject> stmtParams = requestParameters.getStatementParameters(); try { for (Statement stmt : statements) { @@ -354,7 +349,7 @@ metadataProvider.setMaxResultReads(maxResultReads); } handleInsertUpsertStatement(metadataProvider, stmt, hcc, resultSet, resultDelivery, outMetadata, - stats, false, clientContextId, stmtParams, stmtRewriter); + stats, false, requestParameters, stmtParams, stmtRewriter); break; case DELETE: handleDeleteStatement(metadataProvider, stmt, hcc, false, stmtParams, stmtRewriter); @@ -389,7 +384,7 @@ resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); metadataProvider.setMaxResultReads(maxResultReads); handleQuery(metadataProvider, (Query) stmt, hcc, resultSet, resultDelivery, outMetadata, stats, - clientContextId, ctx, stmtParams, stmtRewriter); + requestParameters, stmtParams, stmtRewriter); break; case COMPACT: handleCompactStatement(metadataProvider, stmt, hcc); @@ -406,8 +401,9 @@ // No op break; case EXTENSION: + //TODO remove deprecated statement executor context ((ExtensionStatement) stmt).handle(hcc, this, requestParameters, metadataProvider, - resultSetIdCounter, ctx); + resultSetIdCounter, NoOpStatementExecutorContext.INSTANCE); break; default: throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(), @@ -415,6 +411,10 @@ } } } finally { + // async queries are completed after their job completes + if (ResultDelivery.ASYNC != resultDelivery) { + appCtx.getRequestTracker().complete(requestParameters.getRequestReference().getUuid()); + } Thread.currentThread().setName(threadName); } } @@ -1856,7 +1856,7 @@ public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, - ResultMetadata outMetadata, Stats stats, boolean compileOnly, String clientContextId, + ResultMetadata outMetadata, Stats stats, boolean compileOnly, IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception { InsertStatement stmtInsertUpsert = (InsertStatement) stmt; String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName()); @@ -1901,7 +1901,7 @@ if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - clientContextId, NoOpStatementExecutorContext.INSTANCE); + requestParameters, false); } else { locker.lock(); try { @@ -2454,8 +2454,8 @@ protected void handleQuery(MetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, - String clientContextId, IStatementExecutorContext ctx, Map<String, IAObject> stmtParams, - IStatementRewriter stmtRewriter) throws Exception { + IRequestParameters requestParameters, Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) + throws Exception { final IMetadataLocker locker = new IMetadataLocker() { @Override public void lock() { @@ -2488,19 +2488,19 @@ } }; deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - clientContextId, ctx); + requestParameters, true); } private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler, MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, - ResultMetadata outMetadata, Stats stats, String clientContextId, IStatementExecutorContext ctx) + ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable) throws Exception { final ResultSetId resultSetId = metadataProvider.getResultSetId(); switch (resultDelivery) { case ASYNC: MutableBoolean printed = new MutableBoolean(false); executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery, - clientContextId, ctx, resultSetId, printed)); + requestParameters, cancellable, resultSetId, printed)); synchronized (printed) { while (!printed.booleanValue()) { printed.wait(); @@ -2515,7 +2515,7 @@ sessionOutput.release(); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); - }, clientContextId, ctx); + }, requestParameters, cancellable, appCtx); break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { @@ -2525,7 +2525,7 @@ outMetadata.getResultSets() .add(Triple.of(id, resultSetId, metadataProvider.findOutputRecordType())); } - }, clientContextId, ctx); + }, requestParameters, cancellable, appCtx); break; default: break; @@ -2552,7 +2552,7 @@ } private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker, - ResultDelivery resultDelivery, String clientContextId, IStatementExecutorContext ctx, + ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable, ResultSetId resultSetId, MutableBoolean printed) { Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID); try { @@ -2564,7 +2564,7 @@ printed.setTrue(); printed.notify(); } - }, clientContextId, ctx); + }, requestParameters, cancellable, appCtx); } catch (Exception e) { if (Objects.equals(JobId.INVALID, jobId.getValue())) { // compilation failed @@ -2595,8 +2595,10 @@ private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, - String clientContextId, IStatementExecutorContext ctx) throws Exception { - ClientJobRequest req = null; + IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx) throws Exception { + final IRequestTracker requestTracker = appCtx.getRequestTracker(); + final ClientRequest clientRequest = + (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); locker.lock(); try { final JobSpecification jobSpec = compiler.compile(); @@ -2604,9 +2606,9 @@ return; } final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); - if (ctx != null && clientContextId != null) { - req = new ClientJobRequest(ctx, clientContextId, jobId); - ctx.put(clientContextId, req); // Adds the running job into the context. + clientRequest.setJobId(jobId); + if (cancellable) { + clientRequest.markCancellable(); } if (jId != null) { jId.setValue(jobId); @@ -2619,11 +2621,11 @@ printer.print(jobId); } } finally { - locker.unlock(); - // No matter the job succeeds or fails, removes it into the context. - if (req != null) { - req.complete(); + // complete async jobs after their job completes + if (ResultDelivery.ASYNC == resultDelivery) { + requestTracker.complete(clientRequest.getId()); } + locker.unlock(); } } @@ -2941,6 +2943,13 @@ } } + protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException { + final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived( + requestParameters.getRequestReference(), requestParameters.getClientContextId(), + requestParameters.getStatement(), requestParameters.getOptionalParameters()); + appCtx.getRequestTracker().track(clientRequest); + } + public static void validateStatements(List<Statement> statements) throws CompilationException { if (statements.stream().filter(QueryTranslator::isNotAllowedMultiStatement).count() > 1) { throw new CompilationException(ErrorCode.UNSUPPORTED_MULTIPLE_STATEMENTS); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java index d0adcda..eda8a4a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/RequestParameters.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; +import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.external.parser.JSONDataParser; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.om.base.IAObject; @@ -41,6 +42,7 @@ public class RequestParameters implements IRequestParameters { + private final IRequestReference requestReference; private final IResultSet resultSet; private final ResultProperties resultProperties; private final Stats stats; @@ -49,10 +51,14 @@ private final String clientContextId; private final Map<String, IAObject> statementParameters; private final boolean multiStatement; + private final String statement; - public RequestParameters(IResultSet resultSet, ResultProperties resultProperties, Stats stats, - IStatementExecutor.ResultMetadata outMetadata, String clientContextId, - Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, boolean multiStatement) { + public RequestParameters(IRequestReference requestReference, String statement, IResultSet resultSet, + ResultProperties resultProperties, Stats stats, IStatementExecutor.ResultMetadata outMetadata, + String clientContextId, Map<String, String> optionalParameters, Map<String, IAObject> statementParameters, + boolean multiStatement) { + this.requestReference = requestReference; + this.statement = statement; this.resultSet = resultSet; this.resultProperties = resultProperties; this.stats = stats; @@ -103,6 +109,16 @@ return statementParameters; } + @Override + public String getStatement() { + return statement; + } + + @Override + public IRequestReference getRequestReference() { + return requestReference; + } + public static Map<String, byte[]> serializeParameterValues(Map<String, JsonNode> inParams) throws HyracksDataException { if (inParams == null || inParams.isEmpty()) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 24a1463..ce98a03 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -33,7 +33,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.asterix.api.http.IQueryWebServerRegistrant; -import org.apache.asterix.api.http.ctx.StatementExecutorContext; import org.apache.asterix.api.http.server.ActiveStatsApiServlet; import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.api.http.server.CcQueryCancellationServlet; @@ -55,6 +54,8 @@ import org.apache.asterix.app.replication.NcLifecycleCoordinator; import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.INodeJobTracker; +import org.apache.asterix.common.api.IReceptionistFactory; +import org.apache.asterix.translator.Receptionist; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.ExtensionProperties; @@ -77,7 +78,6 @@ import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.runtime.job.resource.JobCapacityController; import org.apache.asterix.runtime.utils.CcApplicationContext; -import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.util.MetadataBuiltinFunctions; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -110,7 +110,6 @@ protected ICCServiceContext ccServiceCtx; protected CCExtensionManager ccExtensionManager; protected IStorageComponentProvider componentProvider; - protected StatementExecutorContext statementExecutorCtx; protected WebManager webManager; protected ICcApplicationContext appCtx; private IJobCapacityController jobCapacityController; @@ -154,8 +153,8 @@ extensions.addAll(getExtensions()); ccExtensionManager = new CCExtensionManager(extensions); IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); - statementExecutorCtx = new StatementExecutorContext(); - appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator); + appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator, + () -> new Receptionist("CC")); appCtx.setExtensionManager(ccExtensionManager); final CCConfig ccConfig = controllerService.getCCConfig(); if (System.getProperty("java.rmi.server.hostname") == null) { @@ -182,11 +181,11 @@ } protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager, - IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator) - throws AlgebricksException, IOException { + IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator, + IReceptionistFactory receptionistFactory) throws AlgebricksException, IOException { return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE, globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider, - new MetadataLockManager()); + new MetadataLockManager(), receptionistFactory); } protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception { @@ -243,7 +242,6 @@ jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx); jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR, ccServiceCtx.getControllerService().getExecutor()); - jsonAPIServer.setAttribute(ServletConstants.RUNNING_QUERIES_ATTR, statementExecutorCtx); jsonAPIServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ccServiceCtx); // Other APIs. @@ -320,10 +318,6 @@ public IStatementExecutorFactory getStatementExecutorFactory() { return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor()); - } - - public IStatementExecutorContext getStatementExecutorContext() { - return statementExecutorCtx; } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index 3857ac5..1112507 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -34,6 +34,8 @@ import org.apache.asterix.common.api.AsterixThreadFactory; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.IPropertiesFactory; +import org.apache.asterix.common.api.IReceptionistFactory; +import org.apache.asterix.translator.Receptionist; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.GlobalConfig; @@ -128,7 +130,8 @@ } updateOnNodeJoin(); } - runtimeContext.initialize(getRecoveryManagerFactory(), runtimeContext.getNodeProperties().isInitialRun()); + runtimeContext.initialize(getRecoveryManagerFactory(), getReceptionistFactory(), + runtimeContext.getNodeProperties().isInitialRun()); MessagingProperties messagingProperties = runtimeContext.getMessagingProperties(); NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties); this.ncServiceCtx.setMessageBroker(messageBroker); @@ -155,6 +158,10 @@ return RecoveryManager::new; } + protected IReceptionistFactory getReceptionistFactory() { + return () -> new Receptionist(nodeId); + } + @Override protected void configureLoggingLevel(Level level) { super.configureLoggingLevel(level); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index eae82af..1a526b2 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -25,14 +25,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; -import org.apache.asterix.api.http.ctx.StatementExecutorContext; import org.apache.asterix.api.http.server.CcQueryCancellationServlet; import org.apache.asterix.api.http.server.ServletConstants; +import org.apache.asterix.common.api.RequestReference; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.translator.ClientJobRequest; -import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.asterix.runtime.utils.RequestTracker; +import org.apache.asterix.translator.ClientRequest; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.http.api.IServletRequest; @@ -49,15 +50,14 @@ @Test public void testDelete() throws Exception { ICcApplicationContext appCtx = mock(ICcApplicationContext.class); + RequestTracker tracker = new RequestTracker(appCtx); + Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker); // Creates a query cancellation servlet. CcQueryCancellationServlet cancellationServlet = new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" }); // Adds mocked Hyracks client connection into the servlet context. IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class); cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc); - // Adds a query context into the servlet context. - IStatementExecutorContext queryCtx = new StatementExecutorContext(); - cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx); Mockito.when(appCtx.getHcc()).thenReturn(mockHcc); // Tests the case that query is not in the map. IServletRequest mockRequest = mockRequest("1"); @@ -65,8 +65,12 @@ cancellationServlet.handle(mockRequest, mockResponse); verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND); + final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis()); + ClientRequest request = new ClientRequest(requestReference, "1", "select 1;", new HashMap<>()); + request.setJobId(new JobId(1)); + request.markCancellable(); + tracker.track(request); // Tests the case that query is in the map. - queryCtx.put("1", new ClientJobRequest(queryCtx, "1", new JobId(1))); cancellationServlet.handle(mockRequest, mockResponse); verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK); @@ -76,7 +80,11 @@ verify(mockResponse, times(1)).setStatus(HttpResponseStatus.BAD_REQUEST); // Tests the case that the job cancellation hit some exception from Hyracks. - queryCtx.put("2", new ClientJobRequest(queryCtx, "2", new JobId(2))); + final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis()); + ClientRequest request2 = new ClientRequest(requestReference2, "2", "select 1;", new HashMap<>()); + request2.setJobId(new JobId(2)); + request2.markCancellable(); + tracker.track(request2); Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any()); mockRequest = mockRequest("2"); cancellationServlet.handle(mockRequest, mockResponse); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java index 48d11ee..2a945e9 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java @@ -70,7 +70,7 @@ Thread.sleep(10); // Cancels the query request while the query is executing. int rc = cancelQuery(getEndpoint(Servlets.RUNNING_REQUESTS), newParams); - Assert.assertTrue(rc == 200 || rc == 404); + Assert.assertTrue(rc == 200 || rc == 404 || rc == 403); if (rc == 200) { break; } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp index 3a29ef2..7eee42e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp @@ -18,5 +18,5 @@ */ -- param client_context_id=ensure_running_query -- polltimeoutsecs=15 -SELECT VALUE rqst FROM active_requests() rqst +SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex index 92f4746..e31fe3b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex @@ -1 +1 @@ -/\{ "clientContextID": "sleep_async_query", "jobId": "JID:.*", "requestTime": ".*" \}/ \ No newline at end of file +/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/ \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java index 6b5b472..9e5801f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java @@ -76,4 +76,6 @@ * @return the cluster coordination service. */ ICoordinationService getCoordinationService(); + + IReceptionist getReceptionist(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java index 53771d5..430cd2a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -24,11 +24,35 @@ public interface IClientRequest { /** + * A system wide unique id representing this {@link IClientRequest} + * + * @return the system request id + */ + String getId(); + + /** + * A user supplied id representing this {@link IClientRequest} + * + * @return the client supplied request id + */ + String getClientContextId(); + + /** * Mark the request as complete, non-cancellable anymore */ void complete(); /** + * Mark the request as cancellable + */ + void markCancellable(); + + /** + * @return true if the request can be cancelled. Otherwise false. + */ + boolean isCancellable(); + + /** * Cancel a request * * @param appCtx diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 8648c5b..c6e7439 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -67,8 +67,8 @@ IResourceIdFactory getResourceIdFactory(); - void initialize(IRecoveryManagerFactory recoveryManagerFactory, boolean initialRun) - throws IOException, AlgebricksException; + void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory, + boolean initialRun) throws IOException, AlgebricksException; void setShuttingdown(boolean b); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java new file mode 100644 index 0000000..51df306 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import java.util.Map; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.http.api.IServletRequest; + +public interface IReceptionist { + + /** + * Generates a request reference based on {@code request} + * + * @param request + * @return a request reference representing the request + */ + IRequestReference welcome(IServletRequest request); + + /** + * Generates a {@link IClientRequest} based on the requests parameters + * + * @param requestRef + * @param clientContextId + * @param statement + * @param getOptionalParameters + * @return A client request + * @throws HyracksDataException + */ + IClientRequest requestReceived(IRequestReference requestRef, String clientContextId, String statement, + Map<String, String> getOptionalParameters) throws HyracksDataException; +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java new file mode 100644 index 0000000..6784f26 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionistFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +@FunctionalInterface +public interface IReceptionistFactory { + + /** + * Creates a {@link IReceptionist} + * + * @return a receptionist + */ + IReceptionist create(); +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java new file mode 100644 index 0000000..8a25ed2 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestReference.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import java.io.Serializable; + +public interface IRequestReference extends Serializable { + + /** + * Gets the system wide unique request id. + * + * @return the requests id. + */ + String getUuid(); + + /** + * Get the node name which received this requests. + * + * @return the node name + */ + String getNode(); + + /** + * Gets the system time at which the request was received. + * + * @return the time at which the request was received. + */ + long getTime(); +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java new file mode 100644 index 0000000..01bcf82 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import java.util.Collection; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IRequestTracker { + + /** + * Starts tracking {@code request} + * + * @param request + */ + void track(IClientRequest request); + + /** + * Gets a client request by {@code requestId} + * + * @param requestId + * @return the client request if found. Otherwise null. + */ + IClientRequest get(String requestId); + + /** + * Gets a client request by {@code clientContextId} + * + * @param clientContextId + * @return the client request if found. Otherwise null. + */ + IClientRequest getByClientContextId(String clientContextId); + + /** + * Cancels the client request with id {@code requestId} if found. + * + * @param requestId + * @throws HyracksDataException + */ + void cancel(String requestId) throws HyracksDataException; + + /** + * Completes the request with id {@code requestId} + * + * @param requestId + */ + void complete(String requestId); + + /** + * Gets the currently running requests + * + * @return the currently running requests + */ + Collection<IClientRequest> getRunningRequests(); +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java new file mode 100644 index 0000000..eb08c09 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/RequestReference.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class RequestReference implements IRequestReference { + + private static final long serialVersionUID = 1L; + private String uuid; + private String node; + private long time; + private String userAgent; + private String remoteAddr; + + private RequestReference(String uuid, String node, long time) { + this.uuid = uuid; + this.node = node; + this.time = time; + } + + public static RequestReference of(String uuid, String node, long time) { + return new RequestReference(uuid, node, time); + } + + @Override + public String getUuid() { + return uuid; + } + + @Override + public long getTime() { + return time; + } + + public String getNode() { + return node; + } + + public String getUserAgent() { + return userAgent; + } + + public void setUserAgent(String userAgent) { + this.userAgent = userAgent; + } + + public void setRemoteAddr(String remoteAddr) { + this.remoteAddr = remoteAddr; + } + + public String getRemoteAddr() { + return remoteAddr; + } + + @Override + public String toString() { + final ObjectNode object = JSONUtil.createObject(); + object.put("uuid", uuid); + object.put("node", node); + object.put("time", time); + object.put("userAgent", userAgent); + object.put("remoteAddr", remoteAddr); + return JSONUtil.convertNodeOrThrow(object); + } +} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java index 81ae3e1..e4b70f6 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java @@ -21,6 +21,7 @@ import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.INodeJobTracker; +import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ExtensionProperties; @@ -133,4 +134,11 @@ * @return the compression manager */ ICompressionManager getCompressionManager(); + + /** + * Gets the request tracker. + * + * @return the request tracker. + */ + IRequestTracker getRequestTracker(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java index 52dfe90..741af83 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/RequestStatus.java @@ -23,7 +23,8 @@ public enum RequestStatus { SUCCESS, FAILED, - NOT_FOUND; + NOT_FOUND, + REJECTED; public HttpResponseStatus toHttpResponse() { switch (this) { @@ -33,6 +34,8 @@ return HttpResponseStatus.INTERNAL_SERVER_ERROR; case NOT_FOUND: return HttpResponseStatus.NOT_FOUND; + case REJECTED: + return HttpResponseStatus.FORBIDDEN; default: throw new IllegalStateException("Unrecognized status: " + this); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java index 48463e8..b92a15e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java @@ -24,6 +24,9 @@ import org.apache.asterix.common.api.ICoordinationService; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.api.INodeJobTracker; +import org.apache.asterix.common.api.IReceptionist; +import org.apache.asterix.common.api.IReceptionistFactory; +import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.cluster.IGlobalRecoveryManager; import org.apache.asterix.common.config.ActiveProperties; @@ -90,12 +93,15 @@ private final INodeJobTracker nodeJobTracker; private final ITxnIdFactory txnIdFactory; private final ICompressionManager compressionManager; + private final IReceptionist receptionist; + private final IRequestTracker requestTracker; public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc, ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy, IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider, - IMetadataLockManager mdLockManager) throws AlgebricksException, IOException { + IMetadataLockManager mdLockManager, IReceptionistFactory receptionistFactory) + throws AlgebricksException, IOException { this.ccServiceCtx = ccServiceCtx; this.hcc = hcc; this.libraryManager = libraryManager; @@ -125,7 +131,8 @@ nodeJobTracker = new NodeJobTracker(); txnIdFactory = new BulkTxnIdFactory(); compressionManager = new CompressionManager(storageProperties); - + receptionist = receptionistFactory.create(); + requestTracker = new RequestTracker(this); } @Override @@ -283,4 +290,14 @@ public ICompressionManager getCompressionManager() { return compressionManager; } + + @Override + public IReceptionist getReceptionist() { + return receptionist; + } + + @Override + public IRequestTracker getRequestTracker() { + return requestTracker; + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java new file mode 100644 index 0000000..f651eb36 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.utils; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.api.IRequestTracker; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class RequestTracker implements IRequestTracker { + + private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>(); + private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>(); + private final ICcApplicationContext ccAppCtx; + + public RequestTracker(ICcApplicationContext ccAppCtx) { + this.ccAppCtx = ccAppCtx; + } + + @Override + public IClientRequest get(String requestId) { + return runningRequests.get(requestId); + } + + @Override + public IClientRequest getByClientContextId(String clientContextId) { + return clientIdRequests.get(clientContextId); + } + + @Override + public void track(IClientRequest request) { + runningRequests.put(request.getId(), request); + if (request.getClientContextId() != null) { + clientIdRequests.put(request.getClientContextId(), request); + } + } + + @Override + public void cancel(String requestId) throws HyracksDataException { + final IClientRequest request = runningRequests.get(requestId); + if (request == null) { + return; + } + if (!request.isCancellable()) { + throw new IllegalStateException("Request " + request.getId() + " cannot be cancelled"); + } + cancel(request); + } + + @Override + public void complete(String requestId) { + final IClientRequest request = runningRequests.get(requestId); + if (request != null) { + request.complete(); + untrack(request); + } + } + + @Override + public synchronized Collection<IClientRequest> getRunningRequests() { + return Collections.unmodifiableCollection(runningRequests.values()); + } + + private void cancel(IClientRequest request) throws HyracksDataException { + request.cancel(ccAppCtx); + untrack(request); + } + + private void untrack(IClientRequest request) { + runningRequests.remove(request.getId()); + final String clientContextId = request.getClientContextId(); + if (clientContextId != null) { + clientIdRequests.remove(request.getClientContextId()); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java index 006659b..7decbe0 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/JSONUtil.java @@ -63,6 +63,14 @@ return PRETTY_SORTED_WRITER.writeValueAsString(SORTED_MAPPER.treeToValue(node, Object.class)); } + public static String convertNodeOrThrow(final JsonNode node) { + try { + return convertNode(node); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + } + public static void writeNode(final Writer writer, final JsonNode node) throws IOException { PRETTY_SORTED_WRITER.writeValue(writer, SORTED_MAPPER.treeToValue(node, Object.class)); } -- To view, visit https://asterix-gerrit.ics.uci.edu/3163 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: If08ecd91c55881743b2ecf40a628fa3d4166c554 Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
