>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21275?usp=email )
Change subject: [ASTERIXDB-3649][HYR] Measure result expiry since job complete
......................................................................
[ASTERIXDB-3649][HYR] Measure result expiry since job complete
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
The result sweeper uses the job creation time to measure
job expiry. This patch changes it so that it measure
the job expiry since the job complete.
Ext-ref: MB-71997
Change-Id: I33eecf8f907e009da05482255e4f214a39e601c2
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
6 files changed, 70 insertions(+), 10 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/75/21275/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
index 7089c9e..752ff4f 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/IResultStateRecord.java
@@ -19,10 +19,20 @@
package org.apache.hyracks.api.result;
public interface IResultStateRecord {
+
+ /**
+ * Returns the timestamp in nanoseconds when the record is created.
+ */
long getTimestamp();
/**
* Returns the per-request result TTL in nanoseconds, or -1 if system
default should be used.
*/
long getResultTtlInNanos();
+
+ /**
+ * Returns the timestamp in nanoseconds when the record is completed, or 0
if not completed yet.
+ */
+ long getCompleteTimestamp();
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index 0c57931..eae1ba0 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -85,7 +85,7 @@
private final long resultTtlInNanos; // per-request TTL in nanoseconds, -1
for system default
private final String requestId;
private long jobStartTime;
- private long jobEndTime;
+ private volatile long jobEndTime;
private final Status status;
private ResultSetId rsId;
private ResultSetMetaData resultSetMetaData;
@@ -113,8 +113,11 @@
updateState(State.RUNNING);
}
- public void finish(JobStatus jobStatus) {
+ public void finish() {
jobEndTime = System.nanoTime();
+ }
+
+ public void finishWithStatus(JobStatus jobStatus) {
if (jobStatus != null && (status.state == State.RUNNING ||
status.state == State.IDLE)) {
switch (jobStatus) {
case TERMINATED -> updateState(State.SUCCESS);
@@ -159,6 +162,11 @@
}
@Override
+ public long getCompleteTimestamp() {
+ return jobEndTime;
+ }
+
+ @Override
public long getResultTtlInNanos() {
return resultTtlInNanos;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index dfd10a0..736a435 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -104,15 +104,18 @@
@Override
public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus
jobStatus, List<Exception> exceptions)
throws HyracksException {
+ ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
if (exceptions == null || exceptions.isEmpty()) {
- final ResultJobRecord resultJobRecord = getResultJobRecord(jobId);
if (resultJobRecord == null) {
return;
}
- resultJobRecord.finish(jobStatus);
+ resultJobRecord.finishWithStatus(jobStatus);
jobResultCallback.completed(jobId, resultJobRecord);
} else {
- reportJobFailure(jobId, exceptions);
+ reportJobFailure(jobId, exceptions, resultJobRecord);
+ }
+ if (resultJobRecord != null) {
+ resultJobRecord.finish();
}
}
@@ -183,6 +186,10 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception>
exceptions) {
ResultJobRecord rjr = getResultJobRecord(jobId);
+ reportJobFailure(jobId, exceptions, rjr);
+ }
+
+ private synchronized void reportJobFailure(JobId jobId, List<Exception>
exceptions, ResultJobRecord rjr) {
if (logFailure(rjr)) {
LOGGER.debug("job {} failed and is being reported to {}", jobId,
getClass().getSimpleName());
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
index 76b9560..46e6f2d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/result/AbstractResultManager.java
@@ -30,8 +30,8 @@
private final long nanoResultTTL;
- protected AbstractResultManager(long resultTTL) {
- this.nanoResultTTL = TimeUnit.MILLISECONDS.toNanos(resultTTL);
+ protected AbstractResultManager(long resultTTLMillis) {
+ this.nanoResultTTL = TimeUnit.MILLISECONDS.toNanos(resultTTLMillis);
}
@Override
@@ -50,8 +50,13 @@
}
private boolean hasExpired(IResultStateRecord state, long currentTime) {
+ long completeTimestamp = state.getCompleteTimestamp();
+ if (completeTimestamp <= 0) {
+ // Not completed yet, not expired
+ return false;
+ }
// Use per-request TTL if set (> 0), otherwise use system default
long ttl = state.getResultTtlInNanos() > 0 ?
state.getResultTtlInNanos() : nanoResultTTL;
- return currentTime - state.getTimestamp() - ttl > 0;
+ return currentTime - completeTimestamp - ttl > 0;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 3b4e54b..35966e1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -54,6 +54,9 @@
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.api.resources.memory.IFrameProfiler;
+import org.apache.hyracks.api.result.IResultPartitionManager;
+import org.apache.hyracks.api.result.IResultStateRecord;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.PartitionState;
@@ -63,6 +66,7 @@
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
import org.apache.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
+import org.apache.hyracks.control.nc.result.ResultSetMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -269,7 +273,23 @@
LOGGER.trace(() -> "Freeing leaked " + stillAllocated + " bytes");
serviceCtx.getMemoryManager().deallocate(stillAllocated);
}
- nodeController.getExecutor().execute(() ->
deallocatableRegistry.close());
+ nodeController.getExecutor().execute(() -> {
+ try {
+ InvokeUtil.tryWithCleanups(deallocatableRegistry::close, () ->
{
+ IResultPartitionManager resultPartitionManager =
nodeController.getResultPartitionManager();
+ if (cleanupStatus != JobStatus.TERMINATED) {
+ resultPartitionManager.sweep(jobId);
+ } else {
+ IResultStateRecord state =
resultPartitionManager.getState(jobId);
+ if (state instanceof ResultSetMap) {
+ ((ResultSetMap) state).recordCompleteTimestamp();
+ }
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.warn("Failure during joblet clean-up", e);
+ }
+ });
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
index c47b53d..bb73f84 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/result/ResultSetMap.java
@@ -28,7 +28,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-class ResultSetMap implements IResultStateRecord, Serializable {
+public class ResultSetMap implements IResultStateRecord, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
@@ -36,6 +36,7 @@
private final long timestamp;
private final long resultTtlInNanos;
private final HashMap<ResultSetId, ResultState[]> resultStateMap;
+ private volatile long completeTimestamp;
ResultSetMap(long resultTtlInNanos) {
timestamp = System.nanoTime();
@@ -49,10 +50,19 @@
}
@Override
+ public long getCompleteTimestamp() {
+ return completeTimestamp;
+ }
+
+ @Override
public long getResultTtlInNanos() {
return resultTtlInNanos;
}
+ public void recordCompleteTimestamp() {
+ completeTimestamp = System.nanoTime();
+ }
+
ResultState[] getResultStates(ResultSetId rsId) {
return resultStateMap.get(rsId);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/21275?usp=email
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings?usp=email
Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: lumina
Gerrit-Change-Id: I33eecf8f907e009da05482255e4f214a39e601c2
Gerrit-Change-Number: 21275
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>