Repository: carbondata Updated Branches: refs/heads/master 7b5a1c3b9 -> 0be69cd36
[CARBONDATA-1366]add an option 'carbon.global.sort.rdd.storage.level' to configure rdd storage level when sort_scope=global_sort add an option 'carbon.global.sort.rdd.storage.level' for users to configure rdd storage level according to their different env, and the default value of this option is MEMORY_ONLY. This closes #1245 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0be69cd3 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0be69cd3 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0be69cd3 Branch: refs/heads/master Commit: 0be69cd36cd875b35499ea78efb0297bf0377a93 Parents: 7b5a1c3 Author: Zhang Zhichao <441586...@qq.com> Authored: Tue Aug 8 17:42:40 2017 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Thu Aug 10 19:45:37 2017 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 16 +++++++++++ .../carbondata/core/util/CarbonProperties.java | 20 +++++++++++++- .../apache/carbondata/core/util/CarbonUtil.java | 28 ++++++++++++++++++++ .../load/DataLoadProcessBuilderOnSpark.scala | 4 ++- 4 files changed, 66 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index c9f9373..f06eb83 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1304,6 +1304,22 @@ public final class CarbonCommonConstants { */ public static final String CARBON_USE_MULTI_TEMP_DIR_DEFAULT = "false"; + /** + * Which storage level to persist rdd when sort_scope=global_sort + */ + @CarbonProperty + public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL = + "carbon.global.sort.rdd.storage.level"; + + /** + * The default value(MEMORY_ONLY) is designed for executors with big memory, if user's executor + * has less memory, set the CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL to MEMORY_AND_DISK_SER or + * other storage level to correspond to different environment. + * You can get more recommendations about storage level in spark website: + * http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence. + */ + public static final String CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT = "MEMORY_ONLY"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 12776de..2620ecb 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -867,7 +867,7 @@ public final class CarbonProperties { CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT); boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr); if (!validateBoolean) { - LOGGER.info("The carbon.use.multiple.temp.dir configuration value is invalid." + LOGGER.error("The carbon.use.multiple.temp.dir configuration value is invalid." + "Configured value: \"" + usingMultiDirStr + "\"." + "Data Load will not use multiple temp directories."); usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT; @@ -876,6 +876,24 @@ public final class CarbonProperties { } /** + * Return valid storage level + * @return String + */ + public String getGlobalSortRddStorageLevel() { + String storageLevel = getProperty(CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL, + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT); + boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel); + if (!validateStorageLevel) { + LOGGER.error("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL + + " configuration value is invalid. It will use default storage level(" + + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT + + ") to persist rdd."); + storageLevel = CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT; + } + return storageLevel.toUpperCase(); + } + + /** * returns true if carbon property * @param key * @return http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 7628415..edc4c28 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1714,6 +1714,34 @@ public final class CarbonUtil { } /** + * validate the storage level + * @param storageLevel + * @return boolean + */ + public static boolean isValidStorageLevel(String storageLevel) { + if (null == storageLevel || storageLevel.trim().equals("")) { + return false; + } + switch (storageLevel.toUpperCase()) { + case "DISK_ONLY": + case "DISK_ONLY_2": + case "MEMORY_ONLY": + case "MEMORY_ONLY_2": + case "MEMORY_ONLY_SER": + case "MEMORY_ONLY_SER_2": + case "MEMORY_AND_DISK": + case "MEMORY_AND_DISK_2": + case "MEMORY_AND_DISK_SER": + case "MEMORY_AND_DISK_SER_2": + case "OFF_HEAP": + case "NONE": + return true; + default: + return false; + } + } + + /** * validate teh batch size * * @param value http://git-wip-us.apache.org/repos/asf/carbondata/blob/0be69cd3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 534ab88..fed8a96 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder @@ -112,7 +113,8 @@ object DataLoadProcessBuilderOnSpark { // Because if the number of partitions greater than 1, there will be action operator(sample) in // sortBy operator. So here we cache the rdd to avoid do input and convert again. if (numPartitions > 1) { - convertRDD.persist(StorageLevel.MEMORY_AND_DISK) + convertRDD.persist(StorageLevel.fromString( + CarbonProperties.getInstance().getGlobalSortRddStorageLevel())) } import scala.reflect.classTag