Steven Jacobs has submitted this change and it was merged. Change subject: Fix Pre-Distributed Jobs ......................................................................
Fix Pre-Distributed Jobs This fix prevents ResultStateSweeper from deinitializing pre-distributed jobs before they are dropped. Also fixes issues with new lock manager code Change-Id: Id50c52fbc7c891761dcabd654fb9b853b5f7a91d Reviewed-on: https://asterix-gerrit.ics.uci.edu/1656 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java M hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java 9 files changed, 54 insertions(+), 16 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 59b3e88..88d07e4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -1735,7 +1735,12 @@ } }; if (compileOnly) { - return compiler.compile(); + locker.lock(); + try { + return compiler.compile(); + } finally { + locker.unlock(); + } } if (stmtInsertUpsert.getReturnExpression() != null) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java index a376d53..c8463d3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetManager.java @@ -29,4 +29,6 @@ public IDatasetStateRecord getState(JobId jobId); public void deinitState(JobId jobId); + + public long getResultTimestamp(JobId jobId); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index c47a612..8722b92 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -155,7 +155,7 @@ ccContext = new ClusterControllerContext(topology); sweeper = new DeadNodeSweeper(); datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(), - ccConfig.getResultSweepThreshold()); + ccConfig.getResultSweepThreshold(), preDistributedJobStore); deploymentRunMap = new HashMap<>(); stateDumpRunMap = new HashMap<>(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java index c573ae8..117621f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/PreDistributedJobStore.java @@ -62,6 +62,10 @@ return descriptor; } + public boolean jobIsPredistributed(JobId jobId) { + return preDistributedJobDescriptorMap.get(jobId) != null; + } + public void removeDistributedJobDescriptor(JobId jobId) throws HyracksException { PreDistributedJobDescriptor descriptor = preDistributedJobDescriptorMap.get(jobId); if (descriptor == null) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 8fd15f9..3cc41c5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -41,6 +41,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.control.cc.PreDistributedJobStore; import org.apache.hyracks.control.common.dataset.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; @@ -61,9 +62,13 @@ private final Map<JobId, JobResultInfo> jobResultLocations; - public DatasetDirectoryService(long resultTTL, long resultSweepThreshold) { + private final PreDistributedJobStore preDistributedJobStore; + + public DatasetDirectoryService(long resultTTL, long resultSweepThreshold, + PreDistributedJobStore preDistributedJobStore) { this.resultTTL = resultTTL; this.resultSweepThreshold = resultSweepThreshold; + this.preDistributedJobStore = preDistributedJobStore; jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>(); } @@ -178,11 +183,19 @@ } @Override - public synchronized IDatasetStateRecord getState(JobId jobId) { + public IDatasetStateRecord getState(JobId jobId) { return getDatasetJobRecord(jobId); } @Override + public synchronized long getResultTimestamp(JobId jobId) { + if (preDistributedJobStore.jobIsPredistributed(jobId)){ + return -1; + } + return getState(jobId).getTimestamp(); + } + + @Override public synchronized void deinitState(JobId jobId) { jobResultLocations.remove(jobId); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java index e7ac389..da1714b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java @@ -25,7 +25,6 @@ import java.util.logging.Logger; import org.apache.hyracks.api.dataset.IDatasetManager; -import org.apache.hyracks.api.dataset.IDatasetStateRecord; import org.apache.hyracks.api.job.JobId; /** @@ -70,8 +69,8 @@ synchronized (datasetManager) { toBeCollected.clear(); for (JobId jobId : datasetManager.getJobIds()) { - final IDatasetStateRecord state = datasetManager.getState(jobId); - if (state != null && System.currentTimeMillis() > state.getTimestamp() + resultTTL) { + final long timestamp = datasetManager.getResultTimestamp(jobId); + if (timestamp != -1 && System.currentTimeMillis() > timestamp + resultTTL) { toBeCollected.add(jobId); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 9c28c16..9242bba 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -120,7 +120,7 @@ private final Map<JobId, Joblet> jobletMap; - private final Map<JobId, ActivityClusterGraph> activityClusterGraphMap; + private final Map<JobId, ActivityClusterGraph> preDistributedJobActivityClusterGraphMap; private ExecutorService executor; @@ -180,7 +180,7 @@ lccm = new LifeCycleComponentManager(); workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread. jobletMap = new Hashtable<>(); - activityClusterGraphMap = new Hashtable<>(); + preDistributedJobActivityClusterGraphMap = new Hashtable<>(); timer = new Timer(true); serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(NodeControllerService.class.getName()), id)); @@ -373,27 +373,27 @@ } public void storeActivityClusterGraph(JobId jobId, ActivityClusterGraph acg) throws HyracksException { - if (activityClusterGraphMap.get(jobId) != null) { + if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) { throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); } - activityClusterGraphMap.put(jobId, acg); + preDistributedJobActivityClusterGraphMap.put(jobId, acg); } public void removeActivityClusterGraph(JobId jobId) throws HyracksException { - if (activityClusterGraphMap.get(jobId) == null) { + if (preDistributedJobActivityClusterGraphMap.get(jobId) == null) { throw HyracksException.create(ErrorCode.ERROR_FINDING_DISTRIBUTED_JOB, jobId); } - activityClusterGraphMap.remove(jobId); + preDistributedJobActivityClusterGraphMap.remove(jobId); } public void checkForDuplicateDistributedJob(JobId jobId) throws HyracksException { - if (activityClusterGraphMap.get(jobId) != null) { + if (preDistributedJobActivityClusterGraphMap.get(jobId) != null) { throw HyracksException.create(ErrorCode.DUPLICATE_DISTRIBUTED_JOB, jobId); } } public ActivityClusterGraph getActivityClusterGraph(JobId jobId) throws HyracksException { - return activityClusterGraphMap.get(jobId); + return preDistributedJobActivityClusterGraphMap.get(jobId); } public NetworkManager getNetworkManager() { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java index 930a3e0..c7b563b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java @@ -178,11 +178,20 @@ } @Override - public synchronized IDatasetStateRecord getState(JobId jobId) { + public IDatasetStateRecord getState(JobId jobId) { return partitionResultStateMap.get(jobId); } @Override + public synchronized long getResultTimestamp(JobId jobId) { + IDatasetStateRecord r = getState(jobId); + if (r == null) { + return -1; + } + return r.getTimestamp(); + } + + @Override public synchronized void deinitState(JobId jobId) { deinit(jobId); partitionResultStateMap.remove(jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java index f911a75..4a01fdb 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java @@ -78,6 +78,7 @@ ncConfig1.setClusterListenAddress("127.0.0.1"); ncConfig1.setDataListenAddress("127.0.0.1"); ncConfig1.setResultListenAddress("127.0.0.1"); + ncConfig1.setResultSweepThreshold(5000); ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") }); NodeControllerService nc1Base = new NodeControllerService(ncConfig1); nc1 = Mockito.spy(nc1Base); @@ -89,6 +90,7 @@ ncConfig2.setClusterListenAddress("127.0.0.1"); ncConfig2.setDataListenAddress("127.0.0.1"); ncConfig2.setResultListenAddress("127.0.0.1"); + ncConfig2.setResultSweepThreshold(5000); ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") }); NodeControllerService nc2Base = new NodeControllerService(ncConfig2); nc2 = Mockito.spy(nc2Base); @@ -143,6 +145,10 @@ hcc.startJob(jobId2); hcc.waitForCompletion(jobId2); + //wait ten seconds to ensure the result sweeper does not break the job + //The result sweeper runs every 5 seconds during the tests + Thread.sleep(10000); + //run the second job again hcc.startJob(jobId2); hcc.waitForCompletion(jobId2); -- To view, visit https://asterix-gerrit.ics.uci.edu/1656 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Id50c52fbc7c891761dcabd654fb9b853b5f7a91d Gerrit-PatchSet: 9 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <[email protected]> Gerrit-Reviewer: Ildar Absalyamov <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
