>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269?usp=email )
Change subject: max result + untrack for deferred mode
......................................................................
max result + untrack for deferred mode
Change-Id: I8b2182582fce9ac588d2259f5dfd282183404c75
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
A
asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
10 files changed, 93 insertions(+), 11 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/69/21269/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
index 5c89b2e..e5a4034 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java
@@ -59,6 +59,14 @@
}
}
+ @Override
+ public void notifyResultSweep(JobId jobId, ResultJobRecord
resultJobRecord) {
+ String requestId = resultJobRecord.getRequestId();
+ if (requestId != null) {
+ appCtx.getRequestTracker().notifyResultSweep(jobId, requestId);
+ }
+ }
+
private void updateResultMetadata(JobId jobId, ResultJobRecord
resultJobRecord) {
final ResultSetMetaData resultSetMetaData =
resultJobRecord.getResultSetMetaData();
if (resultSetMetaData == null) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index a9320f2..86b71d4 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -562,8 +562,7 @@
metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter.getAndInc()));
metadataProvider.setResultAsyncMode(
resultDelivery == ResultDelivery.ASYNC ||
resultDelivery == ResultDelivery.DEFERRED);
- metadataProvider
-
.setMaxResultReads(!sessionConfig.isIncludeHost() ? UNLIMITED_READS :
maxResultReads);
+
metadataProvider.setMaxResultReads(getMaxResultReads(resultDelivery,
maxResultReads));
if (stats.getProfileType() == Stats.ProfileType.FULL) {
this.jobFlags.add(JobFlag.PROFILE_RUNTIME);
}
@@ -611,6 +610,11 @@
}
}
+ private long getMaxResultReads(ResultDelivery mode, long maxResultReads) {
+ // !sessionConfig.isIncludeHost() typically means the request is via
the new request API
+ return mode == ResultDelivery.ASYNC && !sessionConfig.isIncludeHost()
? UNLIMITED_READS : maxResultReads;
+ }
+
protected void configureMetadataProvider(MetadataProvider
metadataProvider, Map<String, String> config,
Counter resultSetIdCounter, FileSplit outputFile,
IRequestParameters requestParameters,
Statement statement) {
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
new file mode 100644
index 0000000..3d681d2
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/async-deferred-improved/deferred/deferred.3.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// statuscode 404
+$result
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/async-deferred-improved/deferred/deferred.3.ignore
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 8f3ff6d..b5592fe 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -23,6 +23,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
+import org.apache.hyracks.api.job.JobId;
public interface IRequestTracker extends IJobLifecycleListener {
@@ -116,4 +117,12 @@
*/
Optional<IClientRequest> getAsyncOrDeferredRequest(String requestId);
+ /**
+ * Notifies this tracker that the result of job {@code jobId} has been
consumed.
+ *
+ * @param jobId the job id whose result has been consumed
+ * @param requestId the request id associated with the job
+ */
+ void notifyResultSweep(JobId jobId, String requestId);
+
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index 1dbb6b2..f596e07 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -101,6 +101,11 @@
}
@Override
+ public void notifyResultSweep(JobId jobId, String requestId) {
+ removeAsyncOrDeferredRequest(requestId);
+ }
+
+ @Override
public boolean cancel(String requestId) throws HyracksDataException {
final IClientRequest request = runningRequests.get(requestId);
if (request == null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
index fe7eb36..546522c 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IJobResultCallback.java
@@ -22,6 +22,18 @@
public interface IJobResultCallback {
+ IJobResultCallback INSTANCE_NO_OP = new IJobResultCallback() {
+ @Override
+ public void completed(JobId jobId, ResultJobRecord resultJobRecord) {
+
+ }
+
+ @Override
+ public void notifyResultSweep(JobId jobId, ResultJobRecord
resultJobRecord) {
+
+ }
+ };
+
/**
* Notifies this callback that writing the result of job {@code jobId} has
been completed successfully.
*
@@ -29,4 +41,12 @@
* @param resultJobRecord
*/
void completed(JobId jobId, ResultJobRecord resultJobRecord);
+
+ /**
+ * Notifies this callback that the result of job {@code jobId} has been
swept.
+ *
+ * @param jobId the job id whose result has been swept
+ * @param resultJobRecord the result record of the job
+ */
+ void notifyResultSweep(JobId jobId, ResultJobRecord resultJobRecord);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 4308d64..0c57931 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -83,15 +83,17 @@
private static final Logger LOGGER = LogManager.getLogger();
private final long timestamp;
private final long resultTtlInNanos; // per-request TTL in nanoseconds, -1
for system default
+ private final String requestId;
private long jobStartTime;
private long jobEndTime;
- private Status status;
+ private final Status status;
private ResultSetId rsId;
private ResultSetMetaData resultSetMetaData;
private long resultCount;
- private boolean resultSetOrdered;
+ private final boolean resultSetOrdered;
- public ResultJobRecord(boolean resultSetOrdered, long resultTtlInNanos) {
+ public ResultJobRecord(boolean resultSetOrdered, long resultTtlInNanos,
String requestId) {
+ this.requestId = requestId;
this.timestamp = System.nanoTime();
this.resultTtlInNanos = resultTtlInNanos;
this.status = new Status();
@@ -161,6 +163,10 @@
return resultTtlInNanos;
}
+ public String getRequestId() {
+ return requestId;
+ }
+
public Status getStatus() {
return status;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index 22e4bba..b5e90eb 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -123,9 +123,7 @@
@Override
public IJobResultCallback getJobResultCallback() {
- return (jobId, resultJobRecord) -> {
- // no op
- };
+ return IJobResultCallback.INSTANCE_NO_OP;
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 93ee281..dfd10a0 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -92,7 +92,8 @@
if (resultTtl == null) {
resultTtl = -1L;
}
- jobResultLocations.put(jobId, new JobResultInfo(new
ResultJobRecord(partitionsOrdered, resultTtl), null));
+ jobResultLocations.put(jobId,
+ new JobResultInfo(new ResultJobRecord(partitionsOrdered,
resultTtl, spec.getRequestId()), null));
}
@Override
@@ -230,8 +231,19 @@
}
@Override
- public synchronized void sweep(JobId jobId) {
- jobResultLocations.remove(jobId);
+ public void sweep(JobId jobId) {
+ JobResultInfo removedJob;
+ synchronized (this) {
+ removedJob = jobResultLocations.remove(jobId);
+ }
+ if (removedJob != null) {
+ ResultJobRecord rec = removedJob.getRecord();
+ try {
+ jobResultCallback.notifyResultSweep(jobId, rec);
+ } catch (Throwable t) {
+ LOGGER.warn("failed to notify result sweep for job {}, req
{}", jobId, rec.getRequestId(), t);
+ }
+ }
}
@Override
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21269?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I8b2182582fce9ac588d2259f5dfd282183404c75
Gerrit-Change-Number: 21269
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>