Repository: carbondata
Updated Branches:
  refs/heads/master 9ee74fe07 -> 4c9bed8bc


[CARBONDATA-2307] Fix OOM issue when using DataFrame.coalesce

Fix OOM issue when using DataFrame.coalesce

This closes #2136


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4c9bed8b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4c9bed8b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4c9bed8b

Branch: refs/heads/master
Commit: 4c9bed8bc6a8b9a517fa7bbe1635bc3da209c45b
Parents: 9ee74fe
Author: Jin Zhou <xapr...@yeah.net>
Authored: Tue Apr 3 18:48:51 2018 +0800
Committer: manishgupta88 <tomanishgupt...@gmail.com>
Committed: Sat Apr 14 13:09:41 2018 +0530

----------------------------------------------------------------------
 .../datastore/chunk/AbstractRawColumnChunk.java |  4 +++-
 .../chunk/impl/AbstractDimensionColumnPage.java |  5 ++++-
 .../chunk/impl/DimensionRawColumnChunk.java     |  2 ++
 .../chunk/impl/MeasureRawColumnChunk.java       |  2 ++
 .../SafeAbsractDimensionDataChunkStore.java     |  4 +++-
 ...feVariableLengthDimensionDataChunkStore.java |  6 ++++++
 .../core/scan/result/BlockletScannedResult.java |  3 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 21 +++++++++++++++++---
 8 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
index 05ac9ff..af1c811 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -96,7 +96,9 @@ public abstract class AbstractRawColumnChunk {
     this.rowCount = rowCount;
   }
 
-  public abstract void freeMemory();
+  public void freeMemory() {
+    rawData = null;
+  }
 
   public int getColumnIndex() {
     return columnIndex;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
index 6f316c5..91e55dc 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/AbstractDimensionColumnPage.java
@@ -77,7 +77,10 @@ public abstract class AbstractDimensionColumnPage implements 
DimensionColumnPage
    * below method will be used to free the allocated memory
    */
   @Override public void freeMemory() {
-    dataChunkStore.freeMemory();
+    if (dataChunkStore != null) {
+      dataChunkStore.freeMemory();
+      dataChunkStore = null;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index f9bb590..c7a8337 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -102,10 +102,12 @@ public class DimensionRawColumnChunk extends 
AbstractRawColumnChunk {
   }
 
   @Override public void freeMemory() {
+    super.freeMemory();
     if (null != dataChunks) {
       for (int i = 0; i < dataChunks.length; i++) {
         if (dataChunks[i] != null) {
           dataChunks[i].freeMemory();
+          dataChunks[i] = null;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 5e8618b..2311887 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -102,10 +102,12 @@ public class MeasureRawColumnChunk extends 
AbstractRawColumnChunk {
   }
 
   @Override public void freeMemory() {
+    super.freeMemory();
     if (null != columnPages) {
       for (int i = 0; i < columnPages.length; i++) {
         if (columnPages[i] != null) {
           columnPages[i].freeMemory();
+          columnPages[i] = null;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
index f7189e6..e9bf24b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeAbsractDimensionDataChunkStore.java
@@ -72,7 +72,9 @@ public abstract class SafeAbsractDimensionDataChunkStore 
implements DimensionDat
    * Below method will be used to free the memory occupied by the column chunk
    */
   @Override public void freeMemory() {
-    // do nothing as GC will take care of freeing memory
+    data = null;
+    invertedIndex = null;
+    invertedIndexReverse = null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 09230dd..bb9c888 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -184,4 +184,10 @@ public class SafeVariableLengthDimensionDataChunkStore 
extends SafeAbsractDimens
     return ByteUtil.UnsafeComparer.INSTANCE
         .compareTo(data, currentDataOffset, length, compareValue, 0, 
compareValue.length);
   }
+
+  @Override
+  public void freeMemory() {
+    super.freeMemory();
+    dataOffsets = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 29404b4..df403c5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -499,6 +499,7 @@ public abstract class BlockletScannedResult {
           for (int j = 0; j < dimensionColumnPages[i].length; j++) {
             if (null != dimensionColumnPages[i][j]) {
               dimensionColumnPages[i][j].freeMemory();
+              dimensionColumnPages[i][j] = null;
             }
           }
         }
@@ -511,6 +512,7 @@ public abstract class BlockletScannedResult {
           for (int j = 0; j < measureColumnPages[i].length; j++) {
             if (null != measureColumnPages[i][j]) {
               measureColumnPages[i][j].freeMemory();
+              measureColumnPages[i][j] = null;
             }
           }
         }
@@ -521,6 +523,7 @@ public abstract class BlockletScannedResult {
       for (int i = 0; i < dimRawColumnChunks.length; i++) {
         if (null != dimRawColumnChunks[i]) {
           dimRawColumnChunks[i].freeMemory();
+          dimRawColumnChunks[i] = null;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9bed8b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e3a62b6..31d3715 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -413,7 +413,7 @@ class CarbonScanRDD[T: ClassTag](
       // one query id per table
       model.setQueryId(queryId)
       // get RecordReader by FileFormat
-      val reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
+      var reader: RecordReader[Void, Object] = inputSplit.getFileFormat match {
         case FileFormat.ROW_V1 =>
           // create record reader for row format
           DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
@@ -445,11 +445,23 @@ class CarbonScanRDD[T: ClassTag](
           }
       }
 
+      val closeReader = () => {
+        if (reader != null) {
+          try {
+            reader.close()
+          } catch {
+            case e: Exception =>
+              LOGGER.error(e)
+          }
+          reader = null
+        }
+      }
+
       // add task completion before calling initialize as initialize method 
will internally call
       // for usage of unsafe method for processing of one blocklet and if 
there is any exception
       // while doing that the unsafe memory occupied for that task will not 
get cleared
       context.addTaskCompletionListener { _ =>
-        reader.close()
+        closeReader.apply()
         close()
         logStatistics(executionId, taskId, queryStartTime, 
model.getStatisticsRecorder, split)
       }
@@ -468,6 +480,9 @@ class CarbonScanRDD[T: ClassTag](
             finished = !reader.nextKeyValue
             havePair = !finished
           }
+          if (finished) {
+            closeReader.apply()
+          }
           !finished
         }
 
@@ -489,7 +504,6 @@ class CarbonScanRDD[T: ClassTag](
       }
     }
 
-
     iterator.asInstanceOf[Iterator[T]]
   }
 
@@ -727,4 +741,5 @@ class CarbonScanRDD[T: ClassTag](
   def setVectorReaderSupport(boolean: Boolean): Unit = {
     vectorReader = boolean
   }
+
 }

Reply via email to