Murtadha Hubail has submitted this change and it was merged.

Change subject: [ASTERIXDB-2271][RT] Remove Result Ref of Aborted Jobs
......................................................................


[ASTERIXDB-2271][RT] Remove Result Ref of Aborted Jobs

- user model changes: no
- storage format changes: no
- interface changes: yes
  - IDatasetPartitionManager (-) abortAllReaders

Details:
- Currently, there is a possibility of reusing the same
  result reference for two different jobs. This change
  fixes this issue by removing old reference of aborted
  jobs.
- Abort job tasks before aborting result readers to stop
  result generation.

Change-Id: I8170887e007d63b143ef08a3a8e149ab3866fcb1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2386
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
3 files changed, 20 insertions(+), 33 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Jenkins: Verified; No violations found; ; Verified
  Michael Blow: Looks good to me, approved



diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index e6cf6d3..b1e203f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -39,8 +39,6 @@
 
     void abortReader(JobId jobId);
 
-    void abortAllReaders();
-
     void close();
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index fb7308e..b7cf9a4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.control.nc.dataset;
 
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -65,7 +65,7 @@
         } else {
             datasetMemoryManager = null;
         }
-        partitionResultStateMap = new LinkedHashMap<>();
+        partitionResultStateMap = new HashMap<>();
         executor.execute(new ResultStateSweeper(this, resultSweepThreshold, 
LOGGER));
     }
 
@@ -77,14 +77,11 @@
         synchronized (this) {
             dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, 
asyncMode, orderedResult, partition, nPartitions,
                     datasetMemoryManager, fileFactory, maxReads);
-
             ResultSetMap rsIdMap = 
partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
-
             ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, 
nPartitions);
             resultStates[partition] = dpw.getResultState();
         }
-
-        LOGGER.debug("Initialized partition writer: JobId: " + jobId + 
":partition: " + partition);
+        LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", 
jobId, partition);
         return dpw;
     }
 
@@ -103,8 +100,8 @@
     @Override
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, 
int partition) throws HyracksException {
         try {
-            LOGGER.debug("Reporting partition write completion: JobId: " + 
jobId + ": ResultSetId: " + rsId
-                    + ":partition: " + partition);
+            LOGGER.debug("Reporting partition write completion: JobId: 
{}:ResultSetId: {}:partition: {}", jobId, rsId,
+                    partition);
             
ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId,
 rsId, partition);
         } catch (Exception e) {
             throw HyracksException.create(e);
@@ -117,11 +114,11 @@
         ResultState resultState = getResultState(jobId, resultSetId, 
partition);
         DatasetPartitionReader dpr = new DatasetPartitionReader(this, 
datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
-        LOGGER.debug("Initialized partition reader: JobId: " + jobId + 
":ResultSetId: " + resultSetId + ":partition: "
-                + partition);
+        LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: 
{}:partition: {}", jobId, resultSetId,
+                partition);
     }
 
-    protected synchronized ResultState getResultState(JobId jobId, ResultSetId 
resultSetId, int partition)
+    private synchronized ResultState getResultState(JobId jobId, ResultSetId 
resultSetId, int partition)
             throws HyracksException {
         ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap == null) {
@@ -155,13 +152,6 @@
     }
 
     @Override
-    public synchronized void abortAllReaders() {
-        for (ResultSetMap rsIdMap : partitionResultStateMap.values()) {
-            rsIdMap.abortAll();
-        }
-    }
-
-    @Override
     public synchronized void close() {
         for (JobId jobId : getJobIds()) {
             deinit(jobId);
@@ -175,7 +165,7 @@
     }
 
     @Override
-    public ResultSetMap getState(JobId jobId) {
+    public synchronized ResultSetMap getState(JobId jobId) {
         return partitionResultStateMap.get(jobId);
     }
 
@@ -191,5 +181,4 @@
             rsIdMap.closeAndDeleteAll();
         }
     }
-
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 68d677f..2bcf414 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -22,6 +22,7 @@
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.control.nc.Joblet;
@@ -50,19 +51,18 @@
             LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + 
ncs.getId());
         }
         Collection<Joblet> joblets = ncs.getJobletMap().values();
-        for (Joblet ji : joblets) {
-            // TODO(mblow): should we have one jobletmap per cc?
-            if (!ji.getJobId().getCcId().equals(ccId)) {
-                continue;
-            }
-            if (dpm != null) {
-                dpm.abortReader(ji.getJobId());
-            }
-            Collection<Task> tasks = ji.getTaskMap().values();
+        // TODO(mblow): should we have one jobletmap per cc?
+        joblets.stream().filter(joblet -> 
joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> {
+            Collection<Task> tasks = joblet.getTaskMap().values();
             for (Task task : tasks) {
                 task.abort();
             }
-            ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, 
ji.getJobId(), JobStatus.FAILURE));
-        }
+            final JobId jobId = joblet.getJobId();
+            if (dpm != null) {
+                dpm.abortReader(jobId);
+                dpm.sweep(jobId);
+            }
+            ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, 
JobStatus.FAILURE));
+        });
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2386
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I8170887e007d63b143ef08a3a8e149ab3866fcb1
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to