Repository: carbondata
Updated Branches:
  refs/heads/master 7b5a1c3b9 -> 0be69cd36


[CARBONDATA-1366]add an option 'carbon.global.sort.rdd.storage.level' to 
configure rdd storage level when sort_scope=global_sort

add an option 'carbon.global.sort.rdd.storage.level' for users to configure rdd 
storage level according to their different env, and the default value of this 
option is MEMORY_ONLY.

This closes #1245


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0be69cd3
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0be69cd3
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0be69cd3

Branch: refs/heads/master
Commit: 0be69cd36cd875b35499ea78efb0297bf0377a93
Parents: 7b5a1c3
Author: Zhang Zhichao <441586...@qq.com>
Authored: Tue Aug 8 17:42:40 2017 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Thu Aug 10 19:45:37 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 16 +++++++++++
 .../carbondata/core/util/CarbonProperties.java  | 20 +++++++++++++-
 .../apache/carbondata/core/util/CarbonUtil.java | 28 ++++++++++++++++++++
 .../load/DataLoadProcessBuilderOnSpark.scala    |  4 ++-
 4 files changed, 66 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c9f9373..f06eb83 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1304,6 +1304,22 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false";
 
+  /**
+   * Which storage level to persist rdd when sort_scope=global_sort
+   */
+  @CarbonProperty
+  public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL =
+      "carbon.global.sort.rdd.storage.level";
+
+  /**
+   * The default value(MEMORY_ONLY) is designed for executors with big memory, 
if user's executor
+   * has less memory, set the CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL to 
MEMORY_AND_DISK_SER or
+   * other storage level to correspond to different environment.
+   * You can get more recommendations about storage level in spark website:
+   * 
http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence.
+   */
+  public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT = 
"MEMORY_ONLY";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 12776de..2620ecb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -867,7 +867,7 @@ public final class CarbonProperties {
         CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT);
     boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr);
     if (!validateBoolean) {
-      LOGGER.info("The carbon.use.multiple.temp.dir configuration value is 
invalid."
+      LOGGER.error("The carbon.use.multiple.temp.dir configuration value is 
invalid."
           + "Configured value: \"" + usingMultiDirStr + "\"."
           + "Data Load will not use multiple temp directories.");
       usingMultiDirStr = 
CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT;
@@ -876,6 +876,24 @@ public final class CarbonProperties {
   }
 
   /**
+   * Return valid storage level
+   * @return String
+   */
+  public String getGlobalSortRddStorageLevel() {
+    String storageLevel = 
getProperty(CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL,
+        CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT);
+    boolean validateStorageLevel = 
CarbonUtil.isValidStorageLevel(storageLevel);
+    if (!validateStorageLevel) {
+      LOGGER.error("The " + 
CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL
+          + " configuration value is invalid. It will use default storage 
level("
+          + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT
+          + ") to persist rdd.");
+      storageLevel = 
CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT;
+    }
+    return storageLevel.toUpperCase();
+  }
+
+  /**
    * returns true if carbon property
    * @param key
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 7628415..edc4c28 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1714,6 +1714,34 @@ public final class CarbonUtil {
   }
 
   /**
+   * validate the storage level
+   * @param storageLevel
+   * @return boolean
+   */
+  public static boolean isValidStorageLevel(String storageLevel) {
+    if (null == storageLevel || storageLevel.trim().equals("")) {
+      return false;
+    }
+    switch (storageLevel.toUpperCase()) {
+      case "DISK_ONLY":
+      case "DISK_ONLY_2":
+      case "MEMORY_ONLY":
+      case "MEMORY_ONLY_2":
+      case "MEMORY_ONLY_SER":
+      case "MEMORY_ONLY_SER_2":
+      case "MEMORY_AND_DISK":
+      case "MEMORY_AND_DISK_2":
+      case "MEMORY_AND_DISK_SER":
+      case "MEMORY_AND_DISK_SER_2":
+      case "OFF_HEAP":
+      case "NONE":
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
    * validate teh batch size
    *
    * @param value

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 534ab88..fed8a96 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.csvload.{CSVInputFormat, 
StringArrayWritable}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
@@ -112,7 +113,8 @@ object DataLoadProcessBuilderOnSpark {
     // Because if the number of partitions greater than 1, there will be 
action operator(sample) in
     // sortBy operator. So here we cache the rdd to avoid do input and convert 
again.
     if (numPartitions > 1) {
-      convertRDD.persist(StorageLevel.MEMORY_AND_DISK)
+      convertRDD.persist(StorageLevel.fromString(
+        CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
     }
 
     import scala.reflect.classTag

Reply via email to