Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1656
Change subject: Fix Pre-Distributed Jobs
......................................................................
Fix Pre-Distributed Jobs
This fix prevents ResultStateSweeper from deinitializing
pre-distributed jobs before they are dropped.
Change-Id: Id50c52fbc7c891761dcabd654fb9b853b5f7a91d
---
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
8 files changed, 37 insertions(+), 18 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/56/1656/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
index a376d53..707b5ba 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java
@@ -26,7 +26,7 @@
public Set<JobId> getJobIds();
- public IDatasetStateRecord getState(JobId jobId);
+ public long getStateTimestamp(JobId jobId);
public void deinitState(JobId jobId);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47a612..8722b92 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -155,7 +155,7 @@
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
datasetDirectoryService = new
DatasetDirectoryService(ccConfig.getResultTTL(),
- ccConfig.getResultSweepThreshold());
+ ccConfig.getResultSweepThreshold(), preDistributedJobStore);
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
index c573ae8..117621f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java
@@ -62,6 +62,10 @@
return descriptor;
}
+ public boolean jobIsPredistributed(JobId jobId) {
+ return preDistributedJobDescriptorMap.get(jobId) != null;
+ }
+
public void removeDistributedJobDescriptor(JobId jobId) throws
HyracksException {
PreDistributedJobDescriptor descriptor =
preDistributedJobDescriptorMap.get(jobId);
if (descriptor == null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 8fd15f9..18d5eb9 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.control.cc.PreDistributedJobStore;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -61,9 +62,13 @@
private final Map<JobId, JobResultInfo> jobResultLocations;
- public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) {
+ private final PreDistributedJobStore preDistributedJobStore;
+
+ public DatasetDirectoryService(long resultTTL, long resultSweepThreshold,
+ PreDistributedJobStore preDistributedJobStore) {
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
+ this.preDistributedJobStore = preDistributedJobStore;
jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
}
@@ -178,8 +183,12 @@
}
@Override
- public synchronized IDatasetStateRecord getState(JobId jobId) {
- return getDatasetJobRecord(jobId);
+ public synchronized long getStateTimestamp(JobId jobId) {
+ IDatasetStateRecord state = getDatasetJobRecord(jobId);
+ if (preDistributedJobStore.jobIsPredistributed(jobId)){
+ return -1;
+ }
+ return state.getTimestamp();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
index e7ac389..a151cb8 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java
@@ -25,7 +25,6 @@
import java.util.logging.Logger;
import org.apache.hyracks.api.dataset.IDatasetManager;
-import org.apache.hyracks.api.dataset.IDatasetStateRecord;
import org.apache.hyracks.api.job.JobId;
/**
@@ -70,8 +69,8 @@
synchronized (datasetManager) {
toBeCollected.clear();
for (JobId jobId : datasetManager.getJobIds()) {
- final IDatasetStateRecord state =
datasetManager.getState(jobId);
- if (state != null && System.currentTimeMillis() >
state.getTimestamp() + resultTTL) {
+ final long timestamp = datasetManager.getStateTimestamp(jobId);
+ if (timestamp != -1 && System.currentTimeMillis() > timestamp
+ resultTTL) {
toBeCollected.add(jobId);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 9c28c16..9242bba 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -120,7 +120,7 @@
private final Map<JobId, Joblet> jobletMap;
- private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap;
+ private final Map<JobId, ActivityClusterGraph>
preDistributedJobActivityClusterGraphMap;
private ExecutorService executor;
@@ -180,7 +180,7 @@
lccm = new LifeCycleComponentManager();
workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves
MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
- activityClusterGraphMap = new Hashtable<>();
+ preDistributedJobActivityClusterGraphMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
new File(new File(NodeControllerService.class.getName()), id));
@@ -373,27 +373,27 @@
}
public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph
acg) throws HyracksException {
- if (activityClusterGraphMap.get(jobId) != null) {
+ if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
jobId);
}
- activityClusterGraphMap.put(jobId, acg);
+ preDistributedJobActivityClusterGraphMap.put(jobId, acg);
}
public void removeActivityClusterGraph(JobId jobId) throws
HyracksException {
- if (activityClusterGraphMap.get(jobId) == null) {
+ if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) {
throw
HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId);
}
- activityClusterGraphMap.remove(jobId);
+ preDistributedJobActivityClusterGraphMap.remove(jobId);
}
public void checkForDuplicateDistributedJob(JobId jobId) throws
HyracksException {
- if (activityClusterGraphMap.get(jobId) != null) {
+ if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) {
throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB,
jobId);
}
}
public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws
HyracksException {
- return activityClusterGraphMap.get(jobId);
+ return preDistributedJobActivityClusterGraphMap.get(jobId);
}
public NetworkManager getNetworkManager() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 930a3e0..8af99d8 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -178,8 +178,12 @@
}
@Override
- public synchronized IDatasetStateRecord getState(JobId jobId) {
- return partitionResultStateMap.get(jobId);
+ public synchronized long getStateTimestamp(JobId jobId) {
+ IDatasetStateRecord r = partitionResultStateMap.get(jobId);
+ if (r == null) {
+ return -1;
+ }
+ return r.getTimestamp();
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index f911a75..fd14341 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -143,6 +143,9 @@
hcc.startJob(jobId2);
hcc.waitForCompletion(jobId2);
+ //wait five minutes
+ Thread.sleep(300000);
+
//run the second job again
hcc.startJob(jobId2);
hcc.waitForCompletion(jobId2);
--
To view, visit https://asterix-gerrit.ics.uci.edu/1656
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id50c52fbc7c891761dcabd654fb9b853b5f7a91d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>