carbondata git commit: [CARBONDATA-2143] Fixed query memory leak issue for task failure during initialization of record reader
Repository: carbondata Updated Branches: refs/heads/branch-1.3 2c5ecfbfe -> 8b105a1e1 [CARBONDATA-2143] Fixed query memory leak issue for task failure during initialization of record reader Problem: Whenever a query is executed, in the internalCompute method of CarbonScanRdd class record reader is initialized. A task completion listener is attached to each task after initialization of the record reader. During record reader initialization, queryResultIterator is initialized and one blocklet is processed. The blocklet processed will use available unsafe memory. Lets say there are 100 columns and 80 columns get the space but there is no space left for the remaining columns to be stored in the unsafe memory. This will result is memory exception and record reader initialization will fail leading to failure in query. In the above case the unsafe memory allocated for 80 columns will not be freed and will always remain occupied till the JVM process persists. Impact It is memory leak in the system and can lead to query failures for queries executed after one one query fails due to the above reason. Solution: Attach the task completion listener before record reader initialization so that if the query fails at the very first instance after using unsafe memory, still that memory will be cleared. This closes #1948 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8b105a1e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8b105a1e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8b105a1e Branch: refs/heads/branch-1.3 Commit: 8b105a1e1f6e7e7e3b0bc13d44c1bf93fd821e31 Parents: 2c5ecfb Author: m00258959 Authored: Wed Feb 7 12:07:33 2018 +0530 Committer: ravipesala Committed: Thu Feb 8 22:54:48 2018 +0530 -- .../executor/impl/AbstractQueryExecutor.java| 14 +++- .../carbondata/hadoop/AbstractRecordReader.java | 8 +++-- .../carbondata/spark/rdd/CarbonScanRDD.scala| 38 +++- 3 files changed, 40 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b105a1e/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 6875f35..6490694 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -586,9 +586,17 @@ public abstract class AbstractQueryExecutor implements QueryExecutor { */ @Override public void finish() throws QueryExecutionException { CarbonUtil.clearBlockCache(queryProperties.dataBlocks); +Throwable exceptionOccurred = null; if (null != queryIterator) { - queryIterator.close(); + // catch if there is any exception so that it can be rethrown after clearing all the resources + // else if any exception is thrown from this point executor service will not be terminated + try { +queryIterator.close(); + } catch (Throwable e) { +exceptionOccurred = e; + } } +// clear all the unsafe memory used for the given task ID UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); if (null != queryProperties.executorService) { // In case of limit query when number of limit records is already found so executors @@ -596,6 +604,10 @@ public abstract class AbstractQueryExecutor implements QueryExecutor { // the query performance. queryProperties.executorService.shutdownNow(); } +// if there is any exception re throw the exception +if (null != exceptionOccurred) { + throw new QueryExecutionException(exceptionOccurred); +} } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8b105a1e/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java -- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java index 62a97f9..bd4bbce 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java @@ -36,8 +36,10 @@ public abstract class AbstractRecordReader extends RecordReader { */ public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) { // result size -
carbondata git commit: [CARBONDATA-2143] Fixed query memory leak issue for task failure during initialization of record reader
Repository: carbondata Updated Branches: refs/heads/master d94d21d63 -> 4a2d79927 [CARBONDATA-2143] Fixed query memory leak issue for task failure during initialization of record reader Problem: Whenever a query is executed, in the internalCompute method of CarbonScanRdd class record reader is initialized. A task completion listener is attached to each task after initialization of the record reader. During record reader initialization, queryResultIterator is initialized and one blocklet is processed. The blocklet processed will use available unsafe memory. Lets say there are 100 columns and 80 columns get the space but there is no space left for the remaining columns to be stored in the unsafe memory. This will result is memory exception and record reader initialization will fail leading to failure in query. In the above case the unsafe memory allocated for 80 columns will not be freed and will always remain occupied till the JVM process persists. Impact It is memory leak in the system and can lead to query failures for queries executed after one one query fails due to the above reason. Solution: Attach the task completion listener before record reader initialization so that if the query fails at the very first instance after using unsafe memory, still that memory will be cleared. This closes #1948 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4a2d7992 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4a2d7992 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4a2d7992 Branch: refs/heads/master Commit: 4a2d799272391e1c5d06416cbb9bdb6454488753 Parents: d94d21d Author: m00258959 Authored: Wed Feb 7 12:07:33 2018 +0530 Committer: ravipesala Committed: Thu Feb 8 22:54:00 2018 +0530 -- .../executor/impl/AbstractQueryExecutor.java| 14 +++- .../carbondata/hadoop/AbstractRecordReader.java | 8 +++-- .../carbondata/spark/rdd/CarbonScanRDD.scala| 38 +++- 3 files changed, 40 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a2d7992/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 6875f35..6490694 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -586,9 +586,17 @@ public abstract class AbstractQueryExecutor implements QueryExecutor { */ @Override public void finish() throws QueryExecutionException { CarbonUtil.clearBlockCache(queryProperties.dataBlocks); +Throwable exceptionOccurred = null; if (null != queryIterator) { - queryIterator.close(); + // catch if there is any exception so that it can be rethrown after clearing all the resources + // else if any exception is thrown from this point executor service will not be terminated + try { +queryIterator.close(); + } catch (Throwable e) { +exceptionOccurred = e; + } } +// clear all the unsafe memory used for the given task ID UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); if (null != queryProperties.executorService) { // In case of limit query when number of limit records is already found so executors @@ -596,6 +604,10 @@ public abstract class AbstractQueryExecutor implements QueryExecutor { // the query performance. queryProperties.executorService.shutdownNow(); } +// if there is any exception re throw the exception +if (null != exceptionOccurred) { + throw new QueryExecutionException(exceptionOccurred); +} } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4a2d7992/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java -- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java index 62a97f9..bd4bbce 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java @@ -36,8 +36,10 @@ public abstract class AbstractRecordReader extends RecordReader { */ public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) { // result size -Quer