[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-carbondata/pull/362 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90365594 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) -var confExecutorsTemp: String = null -if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") -} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { -confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") +ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { +val nodesOfData = nodeMapping.size() +val confExecutors: Int = getConfiguredExecutors(sparkContext) +LOGGER.info("Executors configured : " + confExecutors) +val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors +} else if (confExecutors > nodesOfData) { + // this case will come only if dynamic allocation is true + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { +// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors +// need to be opened +// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then +// total 4 executors need to be opened +val extraExecutorsToBeRequested = blockList.size - nodesOfData +if (extraExecutorsToBeRequested > confExecutors) { + totalExecutorsToBeRequested = confExecutors +} else { + totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested +} } + LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested) + totalExecutorsToBeRequested +} else { + nodesOfData } -val confExecutors = if (null != confExecutorsTemp) { - confExecutorsTemp.toInt +val startTime = System.currentTimeMillis(); +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // this case will come only if dynamic allocation is true + CarbonContext +.ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty) } else { - 1 + CarbonContext.ensureExecutors(sparkContext, requiredExecutors) } -val requiredExecutors = if (nodeMapping.size > confExecutors) { - confExecutors +getDistinctNodesList(sparkContext, requiredExecutors, startTime) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { +var confExecutors: Int = 0 +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) } else { - nodeMapping.size() + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.get
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90033528 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) -var confExecutorsTemp: String = null -if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") -} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { -confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") +ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], --- End diff -- please format the code style for parameter list correctly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90033298 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) -var confExecutorsTemp: String = null -if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") -} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { -confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") +ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { +val nodesOfData = nodeMapping.size() +val confExecutors: Int = getConfiguredExecutors(sparkContext) +LOGGER.info("Executors configured : " + confExecutors) +val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors +} else if (confExecutors > nodesOfData) { + // this case will come only if dynamic allocation is true + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { +// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors +// need to be opened +// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then +// total 4 executors need to be opened +val extraExecutorsToBeRequested = blockList.size - nodesOfData +if (extraExecutorsToBeRequested > confExecutors) { + totalExecutorsToBeRequested = confExecutors +} else { + totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested +} } + LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested) + totalExecutorsToBeRequested +} else { + nodesOfData } -val confExecutors = if (null != confExecutorsTemp) { - confExecutorsTemp.toInt +val startTime = System.currentTimeMillis(); +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // this case will come only if dynamic allocation is true + CarbonContext +.ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty) } else { - 1 + CarbonContext.ensureExecutors(sparkContext, requiredExecutors) } -val requiredExecutors = if (nodeMapping.size > confExecutors) { - confExecutors +getDistinctNodesList(sparkContext, requiredExecutors, startTime) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { +var confExecutors: Int = 0 +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) } else { - nodeMapping.size() + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.get
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90032032 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) -var confExecutorsTemp: String = null -if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") -} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { -confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") +ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { +val nodesOfData = nodeMapping.size() +val confExecutors: Int = getConfiguredExecutors(sparkContext) +LOGGER.info("Executors configured : " + confExecutors) +val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors +} else if (confExecutors > nodesOfData) { + // this case will come only if dynamic allocation is true + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { +// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors +// need to be opened +// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then +// total 4 executors need to be opened +val extraExecutorsToBeRequested = blockList.size - nodesOfData +if (extraExecutorsToBeRequested > confExecutors) { + totalExecutorsToBeRequested = confExecutors +} else { + totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested +} } + LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested) + totalExecutorsToBeRequested +} else { + nodesOfData } -val confExecutors = if (null != confExecutorsTemp) { - confExecutorsTemp.toInt +val startTime = System.currentTimeMillis(); +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // this case will come only if dynamic allocation is true + CarbonContext +.ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty) } else { - 1 + CarbonContext.ensureExecutors(sparkContext, requiredExecutors) } -val requiredExecutors = if (nodeMapping.size > confExecutors) { - confExecutors +getDistinctNodesList(sparkContext, requiredExecutors, startTime) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { +var confExecutors: Int = 0 +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) } else { - nodeMapping.size() + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.get
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90032072 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) -var confExecutorsTemp: String = null -if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") -} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { -confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") +ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { +val nodesOfData = nodeMapping.size() +val confExecutors: Int = getConfiguredExecutors(sparkContext) +LOGGER.info("Executors configured : " + confExecutors) +val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors +} else if (confExecutors > nodesOfData) { + // this case will come only if dynamic allocation is true + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { +// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors +// need to be opened +// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then +// total 4 executors need to be opened +val extraExecutorsToBeRequested = blockList.size - nodesOfData +if (extraExecutorsToBeRequested > confExecutors) { + totalExecutorsToBeRequested = confExecutors +} else { + totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested +} } + LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested) + totalExecutorsToBeRequested +} else { + nodesOfData } -val confExecutors = if (null != confExecutorsTemp) { - confExecutorsTemp.toInt +val startTime = System.currentTimeMillis(); +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // this case will come only if dynamic allocation is true + CarbonContext +.ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty) } else { - 1 + CarbonContext.ensureExecutors(sparkContext, requiredExecutors) } -val requiredExecutors = if (nodeMapping.size > confExecutors) { - confExecutors +getDistinctNodesList(sparkContext, requiredExecutors, startTime) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { +var confExecutors: Int = 0 +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) } else { - nodeMapping.size() + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.get
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031584 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala --- @@ -208,4 +208,36 @@ object CarbonContext { } } + + /** + * Requesting the extra executors other than the existing ones. + * + * @param sc sparkContext + * @param requiredExecutors required number of executors to be requested + * @param localityAwareTasks The number of pending tasks which is locality required + * @param hostToLocalTaskCount A map to store hostname with its possible task number running on it + * @return + */ + final def ensureExecutors(sc: SparkContext, --- End diff -- Do not add this function here, add it to DistributionUtil, like in #365. It will be reused for spark2 integration --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031916 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) -var confExecutorsTemp: String = null -if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") -} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { -confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") +ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { +val nodesOfData = nodeMapping.size() +val confExecutors: Int = getConfiguredExecutors(sparkContext) +LOGGER.info("Executors configured : " + confExecutors) +val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors +} else if (confExecutors > nodesOfData) { + // this case will come only if dynamic allocation is true + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { +// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors +// need to be opened +// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then +// total 4 executors need to be opened +val extraExecutorsToBeRequested = blockList.size - nodesOfData +if (extraExecutorsToBeRequested > confExecutors) { + totalExecutorsToBeRequested = confExecutors +} else { + totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested +} } + LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested) + totalExecutorsToBeRequested +} else { + nodesOfData } -val confExecutors = if (null != confExecutorsTemp) { - confExecutorsTemp.toInt +val startTime = System.currentTimeMillis(); +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // this case will come only if dynamic allocation is true + CarbonContext +.ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty) } else { - 1 + CarbonContext.ensureExecutors(sparkContext, requiredExecutors) } -val requiredExecutors = if (nodeMapping.size > confExecutors) { - confExecutors +getDistinctNodesList(sparkContext, requiredExecutors, startTime) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { +var confExecutors: Int = 0 +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) } else { - nodeMapping.size() + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.get
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031795 --- Diff: integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala --- @@ -101,47 +101,107 @@ object DistributionUtil { * Checking if the existing executors is greater than configured executors, if yes * returning configured executors. * - * @param blockList + * @param blockList total number of blocks in the identified segments * @param sparkContext * @return */ def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], sparkContext: SparkContext): Seq[String] = { val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) -var confExecutorsTemp: String = null -if (sparkContext.getConf.contains("spark.executor.instances")) { - confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") -} else if (sparkContext.getConf.contains("spark.dynamicAllocation.enabled") - && sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim - .equalsIgnoreCase("true")) { - if (sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) { -confExecutorsTemp = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors") +ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext) + } + + /** + * This method will ensure that the required/configured number of executors are requested + * for processing the identified blocks + * + * @param nodeMapping + * @param blockList + * @param sparkContext + * @return + */ + private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: java.util.Map[String, java.util + .List[Distributable]], blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { +val nodesOfData = nodeMapping.size() +val confExecutors: Int = getConfiguredExecutors(sparkContext) +LOGGER.info("Executors configured : " + confExecutors) +val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) { + confExecutors +} else if (confExecutors > nodesOfData) { + // this case will come only if dynamic allocation is true + var totalExecutorsToBeRequested = nodesOfData + // If total number of blocks are greater than the nodes identified then ensure + // that the configured number of max executors can be opened based on the difference of + // block list size and nodes identified + if (blockList.size > nodesOfData) { +// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, then all executors +// need to be opened +// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then +// total 4 executors need to be opened +val extraExecutorsToBeRequested = blockList.size - nodesOfData +if (extraExecutorsToBeRequested > confExecutors) { + totalExecutorsToBeRequested = confExecutors +} else { + totalExecutorsToBeRequested = nodesOfData + extraExecutorsToBeRequested +} } + LOGGER.info("Total executors requested: " + totalExecutorsToBeRequested) + totalExecutorsToBeRequested +} else { + nodesOfData } -val confExecutors = if (null != confExecutorsTemp) { - confExecutorsTemp.toInt +val startTime = System.currentTimeMillis(); +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // this case will come only if dynamic allocation is true + CarbonContext +.ensureExecutors(sparkContext, requiredExecutors, blockList.size, Map.empty) } else { - 1 + CarbonContext.ensureExecutors(sparkContext, requiredExecutors) } -val requiredExecutors = if (nodeMapping.size > confExecutors) { - confExecutors +getDistinctNodesList(sparkContext, requiredExecutors, startTime) + } + + /** + * This method will return the configured executors + * + * @param sparkContext + * @return + */ + private def getConfiguredExecutors(sparkContext: SparkContext): Int = { +var confExecutors: Int = 0 +if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", false)) { + // default value for spark.dynamicAllocation.maxExecutors is infinity + confExecutors = sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1) } else { - nodeMapping.size() + // default value for spark.executor.instances is 2 + confExecutors = sparkContext.getConf.get
[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...
GitHub user manishgupta88 opened a pull request: https://github.com/apache/incubator-carbondata/pull/362 [CARBONDATA-459] Block distribution is wrong in case of dynamic allocation=true Problem: Block distribution is wrong in case of dynamic allocation=true Analysis: In case when dynamic allocation is true and configured max executors are more than the initial executors then carbon is not able to request the max number of executors configured. Due to this resources are getting under utilized and case when number of blocks increases, the distribution of blocks is limited to the number of nodes and the number of tasks launched are less. This leads to under utilization of resources and hence impacts the query and load performance. Fix: Request for starting the maximum number of configured executors in case dynamic allocation is true. Impact area: Query and data load flow performance due to under utilization of resources. You can merge this pull request into a Git repository by running: $ git pull https://github.com/manishgupta88/incubator-carbondata dynamic_allocation_block_distribution Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-carbondata/pull/362.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #362 commit 0019dfc08ef589d75c45feecd8b559bca0b35f08 Author: manishgupta88 Date: 2016-11-28T10:07:11Z Problem: Block distribution is wrong in case of dynamic allocation=true Analysis: In case when dynamic allocation is true and configured max executors are more than the initial executors then carbon is not able to request the max number of executors configured. Due to this resources are getting under utilized and case when number of blocks increases, the distribution of blocks is limited to the number of nodes and the number of tasks launched are less. This leads to under utilization of resources and hence impacts the query and load performance. Fix: Request for starting the maximum number of configured executors in case dynamic allocation is true. Impact area: Query and data load flow performance due to under utilization of resources. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---