[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-12-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-carbondata/pull/362


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-30 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90365594
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 ---
@@ -101,47 +101,107 @@ object DistributionUtil {
* Checking if the existing executors is greater than configured 
executors, if yes
* returning configured executors.
*
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
* @param sparkContext
* @return
*/
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
 sparkContext: SparkContext): Seq[String] = {
 val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-var confExecutorsTemp: String = null
-if (sparkContext.getConf.contains("spark.executor.instances")) {
-  confExecutorsTemp = 
sparkContext.getConf.get("spark.executor.instances")
-} else if 
(sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-   && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
-  if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, 
sparkContext)
+  }
+
+  /**
+   * This method will ensure that the required/configured number of 
executors are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: 
java.util.Map[String, java.util
+  .List[Distributable]], blockList: Seq[Distributable],
+  sparkContext: SparkContext): Seq[String] = {
+val nodesOfData = nodeMapping.size()
+val confExecutors: Int = getConfiguredExecutors(sparkContext)
+LOGGER.info("Executors configured : " + confExecutors)
+val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
+  confExecutors
+} else if (confExecutors > nodesOfData) {
+  // this case will come only if dynamic allocation is true
+  var totalExecutorsToBeRequested = nodesOfData
+  // If total number of blocks are greater than the nodes identified 
then ensure
+  // that the configured number of max executors can be opened based 
on the difference of
+  // block list size and nodes identified
+  if (blockList.size > nodesOfData) {
+// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, 
then all executors
+// need to be opened
+// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
+// total 4 executors need to be opened
+val extraExecutorsToBeRequested = blockList.size - nodesOfData
+if (extraExecutorsToBeRequested > confExecutors) {
+  totalExecutorsToBeRequested = confExecutors
+} else {
+  totalExecutorsToBeRequested = nodesOfData + 
extraExecutorsToBeRequested
+}
   }
+  LOGGER.info("Total executors requested: " + 
totalExecutorsToBeRequested)
+  totalExecutorsToBeRequested
+} else {
+  nodesOfData
 }
 
-val confExecutors = if (null != confExecutorsTemp) {
-  confExecutorsTemp.toInt
+val startTime = System.currentTimeMillis();
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // this case will come only if dynamic allocation is true
+  CarbonContext
+.ensureExecutors(sparkContext, requiredExecutors, blockList.size, 
Map.empty)
 } else {
-  1
+  CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
 }
-val requiredExecutors = if (nodeMapping.size > confExecutors) {
-  confExecutors
+getDistinctNodesList(sparkContext, requiredExecutors, startTime)
+  }
+
+  /**
+   * This method will return the configured executors
+   *
+   * @param sparkContext
+   * @return
+   */
+  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+var confExecutors: Int = 0
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // default value for spark.dynamicAllocation.maxExecutors is infinity
+  confExecutors = 
sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
 } else {
-  nodeMapping.size()
+  // default value for spark.executor.instances is 2
+  confExecutors = 
sparkContext.getConf.get

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90033528
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 ---
@@ -101,47 +101,107 @@ object DistributionUtil {
* Checking if the existing executors is greater than configured 
executors, if yes
* returning configured executors.
*
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
* @param sparkContext
* @return
*/
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
 sparkContext: SparkContext): Seq[String] = {
 val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-var confExecutorsTemp: String = null
-if (sparkContext.getConf.contains("spark.executor.instances")) {
-  confExecutorsTemp = 
sparkContext.getConf.get("spark.executor.instances")
-} else if 
(sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-   && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
-  if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, 
sparkContext)
+  }
+
+  /**
+   * This method will ensure that the required/configured number of 
executors are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: 
java.util.Map[String, java.util
+  .List[Distributable]], blockList: Seq[Distributable],
--- End diff --

please format the code style for parameter list correctly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90033298
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 ---
@@ -101,47 +101,107 @@ object DistributionUtil {
* Checking if the existing executors is greater than configured 
executors, if yes
* returning configured executors.
*
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
* @param sparkContext
* @return
*/
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
 sparkContext: SparkContext): Seq[String] = {
 val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-var confExecutorsTemp: String = null
-if (sparkContext.getConf.contains("spark.executor.instances")) {
-  confExecutorsTemp = 
sparkContext.getConf.get("spark.executor.instances")
-} else if 
(sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-   && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
-  if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, 
sparkContext)
+  }
+
+  /**
+   * This method will ensure that the required/configured number of 
executors are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: 
java.util.Map[String, java.util
+  .List[Distributable]], blockList: Seq[Distributable],
+  sparkContext: SparkContext): Seq[String] = {
+val nodesOfData = nodeMapping.size()
+val confExecutors: Int = getConfiguredExecutors(sparkContext)
+LOGGER.info("Executors configured : " + confExecutors)
+val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
+  confExecutors
+} else if (confExecutors > nodesOfData) {
+  // this case will come only if dynamic allocation is true
+  var totalExecutorsToBeRequested = nodesOfData
+  // If total number of blocks are greater than the nodes identified 
then ensure
+  // that the configured number of max executors can be opened based 
on the difference of
+  // block list size and nodes identified
+  if (blockList.size > nodesOfData) {
+// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, 
then all executors
+// need to be opened
+// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
+// total 4 executors need to be opened
+val extraExecutorsToBeRequested = blockList.size - nodesOfData
+if (extraExecutorsToBeRequested > confExecutors) {
+  totalExecutorsToBeRequested = confExecutors
+} else {
+  totalExecutorsToBeRequested = nodesOfData + 
extraExecutorsToBeRequested
+}
   }
+  LOGGER.info("Total executors requested: " + 
totalExecutorsToBeRequested)
+  totalExecutorsToBeRequested
+} else {
+  nodesOfData
 }
 
-val confExecutors = if (null != confExecutorsTemp) {
-  confExecutorsTemp.toInt
+val startTime = System.currentTimeMillis();
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // this case will come only if dynamic allocation is true
+  CarbonContext
+.ensureExecutors(sparkContext, requiredExecutors, blockList.size, 
Map.empty)
 } else {
-  1
+  CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
 }
-val requiredExecutors = if (nodeMapping.size > confExecutors) {
-  confExecutors
+getDistinctNodesList(sparkContext, requiredExecutors, startTime)
+  }
+
+  /**
+   * This method will return the configured executors
+   *
+   * @param sparkContext
+   * @return
+   */
+  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+var confExecutors: Int = 0
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // default value for spark.dynamicAllocation.maxExecutors is infinity
+  confExecutors = 
sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
 } else {
-  nodeMapping.size()
+  // default value for spark.executor.instances is 2
+  confExecutors = 
sparkContext.getConf.get

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90032032
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 ---
@@ -101,47 +101,107 @@ object DistributionUtil {
* Checking if the existing executors is greater than configured 
executors, if yes
* returning configured executors.
*
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
* @param sparkContext
* @return
*/
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
 sparkContext: SparkContext): Seq[String] = {
 val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-var confExecutorsTemp: String = null
-if (sparkContext.getConf.contains("spark.executor.instances")) {
-  confExecutorsTemp = 
sparkContext.getConf.get("spark.executor.instances")
-} else if 
(sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-   && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
-  if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, 
sparkContext)
+  }
+
+  /**
+   * This method will ensure that the required/configured number of 
executors are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: 
java.util.Map[String, java.util
+  .List[Distributable]], blockList: Seq[Distributable],
+  sparkContext: SparkContext): Seq[String] = {
+val nodesOfData = nodeMapping.size()
+val confExecutors: Int = getConfiguredExecutors(sparkContext)
+LOGGER.info("Executors configured : " + confExecutors)
+val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
+  confExecutors
+} else if (confExecutors > nodesOfData) {
+  // this case will come only if dynamic allocation is true
+  var totalExecutorsToBeRequested = nodesOfData
+  // If total number of blocks are greater than the nodes identified 
then ensure
+  // that the configured number of max executors can be opened based 
on the difference of
+  // block list size and nodes identified
+  if (blockList.size > nodesOfData) {
+// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, 
then all executors
+// need to be opened
+// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
+// total 4 executors need to be opened
+val extraExecutorsToBeRequested = blockList.size - nodesOfData
+if (extraExecutorsToBeRequested > confExecutors) {
+  totalExecutorsToBeRequested = confExecutors
+} else {
+  totalExecutorsToBeRequested = nodesOfData + 
extraExecutorsToBeRequested
+}
   }
+  LOGGER.info("Total executors requested: " + 
totalExecutorsToBeRequested)
+  totalExecutorsToBeRequested
+} else {
+  nodesOfData
 }
 
-val confExecutors = if (null != confExecutorsTemp) {
-  confExecutorsTemp.toInt
+val startTime = System.currentTimeMillis();
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // this case will come only if dynamic allocation is true
+  CarbonContext
+.ensureExecutors(sparkContext, requiredExecutors, blockList.size, 
Map.empty)
 } else {
-  1
+  CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
 }
-val requiredExecutors = if (nodeMapping.size > confExecutors) {
-  confExecutors
+getDistinctNodesList(sparkContext, requiredExecutors, startTime)
+  }
+
+  /**
+   * This method will return the configured executors
+   *
+   * @param sparkContext
+   * @return
+   */
+  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+var confExecutors: Int = 0
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // default value for spark.dynamicAllocation.maxExecutors is infinity
+  confExecutors = 
sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
 } else {
-  nodeMapping.size()
+  // default value for spark.executor.instances is 2
+  confExecutors = 
sparkContext.getConf.get

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90032072
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 ---
@@ -101,47 +101,107 @@ object DistributionUtil {
* Checking if the existing executors is greater than configured 
executors, if yes
* returning configured executors.
*
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
* @param sparkContext
* @return
*/
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
 sparkContext: SparkContext): Seq[String] = {
 val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-var confExecutorsTemp: String = null
-if (sparkContext.getConf.contains("spark.executor.instances")) {
-  confExecutorsTemp = 
sparkContext.getConf.get("spark.executor.instances")
-} else if 
(sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-   && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
-  if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, 
sparkContext)
+  }
+
+  /**
+   * This method will ensure that the required/configured number of 
executors are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: 
java.util.Map[String, java.util
+  .List[Distributable]], blockList: Seq[Distributable],
+  sparkContext: SparkContext): Seq[String] = {
+val nodesOfData = nodeMapping.size()
+val confExecutors: Int = getConfiguredExecutors(sparkContext)
+LOGGER.info("Executors configured : " + confExecutors)
+val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
+  confExecutors
+} else if (confExecutors > nodesOfData) {
+  // this case will come only if dynamic allocation is true
+  var totalExecutorsToBeRequested = nodesOfData
+  // If total number of blocks are greater than the nodes identified 
then ensure
+  // that the configured number of max executors can be opened based 
on the difference of
+  // block list size and nodes identified
+  if (blockList.size > nodesOfData) {
+// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, 
then all executors
+// need to be opened
+// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
+// total 4 executors need to be opened
+val extraExecutorsToBeRequested = blockList.size - nodesOfData
+if (extraExecutorsToBeRequested > confExecutors) {
+  totalExecutorsToBeRequested = confExecutors
+} else {
+  totalExecutorsToBeRequested = nodesOfData + 
extraExecutorsToBeRequested
+}
   }
+  LOGGER.info("Total executors requested: " + 
totalExecutorsToBeRequested)
+  totalExecutorsToBeRequested
+} else {
+  nodesOfData
 }
 
-val confExecutors = if (null != confExecutorsTemp) {
-  confExecutorsTemp.toInt
+val startTime = System.currentTimeMillis();
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // this case will come only if dynamic allocation is true
+  CarbonContext
+.ensureExecutors(sparkContext, requiredExecutors, blockList.size, 
Map.empty)
 } else {
-  1
+  CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
 }
-val requiredExecutors = if (nodeMapping.size > confExecutors) {
-  confExecutors
+getDistinctNodesList(sparkContext, requiredExecutors, startTime)
+  }
+
+  /**
+   * This method will return the configured executors
+   *
+   * @param sparkContext
+   * @return
+   */
+  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+var confExecutors: Int = 0
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // default value for spark.dynamicAllocation.maxExecutors is infinity
+  confExecutors = 
sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
 } else {
-  nodeMapping.size()
+  // default value for spark.executor.instances is 2
+  confExecutors = 
sparkContext.getConf.get

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031584
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala ---
@@ -208,4 +208,36 @@ object CarbonContext {
 }
 
   }
+
+  /**
+   * Requesting the extra executors other than the existing ones.
+   *
+   * @param sc   sparkContext
+   * @param requiredExecutors required number of executors to be 
requested
+   * @param localityAwareTasks   The number of pending tasks which is 
locality required
+   * @param hostToLocalTaskCount A map to store hostname with its possible 
task number running on it
+   * @return
+   */
+  final def ensureExecutors(sc: SparkContext,
--- End diff --

Do not add this function here, add it to DistributionUtil, like in #365. It 
will be reused for spark2 integration


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031916
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 ---
@@ -101,47 +101,107 @@ object DistributionUtil {
* Checking if the existing executors is greater than configured 
executors, if yes
* returning configured executors.
*
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
* @param sparkContext
* @return
*/
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
 sparkContext: SparkContext): Seq[String] = {
 val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-var confExecutorsTemp: String = null
-if (sparkContext.getConf.contains("spark.executor.instances")) {
-  confExecutorsTemp = 
sparkContext.getConf.get("spark.executor.instances")
-} else if 
(sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-   && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
-  if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, 
sparkContext)
+  }
+
+  /**
+   * This method will ensure that the required/configured number of 
executors are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: 
java.util.Map[String, java.util
+  .List[Distributable]], blockList: Seq[Distributable],
+  sparkContext: SparkContext): Seq[String] = {
+val nodesOfData = nodeMapping.size()
+val confExecutors: Int = getConfiguredExecutors(sparkContext)
+LOGGER.info("Executors configured : " + confExecutors)
+val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
+  confExecutors
+} else if (confExecutors > nodesOfData) {
+  // this case will come only if dynamic allocation is true
+  var totalExecutorsToBeRequested = nodesOfData
+  // If total number of blocks are greater than the nodes identified 
then ensure
+  // that the configured number of max executors can be opened based 
on the difference of
+  // block list size and nodes identified
+  if (blockList.size > nodesOfData) {
+// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, 
then all executors
+// need to be opened
+// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
+// total 4 executors need to be opened
+val extraExecutorsToBeRequested = blockList.size - nodesOfData
+if (extraExecutorsToBeRequested > confExecutors) {
+  totalExecutorsToBeRequested = confExecutors
+} else {
+  totalExecutorsToBeRequested = nodesOfData + 
extraExecutorsToBeRequested
+}
   }
+  LOGGER.info("Total executors requested: " + 
totalExecutorsToBeRequested)
+  totalExecutorsToBeRequested
+} else {
+  nodesOfData
 }
 
-val confExecutors = if (null != confExecutorsTemp) {
-  confExecutorsTemp.toInt
+val startTime = System.currentTimeMillis();
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // this case will come only if dynamic allocation is true
+  CarbonContext
+.ensureExecutors(sparkContext, requiredExecutors, blockList.size, 
Map.empty)
 } else {
-  1
+  CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
 }
-val requiredExecutors = if (nodeMapping.size > confExecutors) {
-  confExecutors
+getDistinctNodesList(sparkContext, requiredExecutors, startTime)
+  }
+
+  /**
+   * This method will return the configured executors
+   *
+   * @param sparkContext
+   * @return
+   */
+  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+var confExecutors: Int = 0
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // default value for spark.dynamicAllocation.maxExecutors is infinity
+  confExecutors = 
sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
 } else {
-  nodeMapping.size()
+  // default value for spark.executor.instances is 2
+  confExecutors = 
sparkContext.getConf.get

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-29 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/362#discussion_r90031795
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 ---
@@ -101,47 +101,107 @@ object DistributionUtil {
* Checking if the existing executors is greater than configured 
executors, if yes
* returning configured executors.
*
-   * @param blockList
+   * @param blockList total number of blocks in the identified segments
* @param sparkContext
* @return
*/
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
 sparkContext: SparkContext): Seq[String] = {
 val nodeMapping = 
CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
-var confExecutorsTemp: String = null
-if (sparkContext.getConf.contains("spark.executor.instances")) {
-  confExecutorsTemp = 
sparkContext.getConf.get("spark.executor.instances")
-} else if 
(sparkContext.getConf.contains("spark.dynamicAllocation.enabled")
-   && 
sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
-  if 
(sparkContext.getConf.contains("spark.dynamicAllocation.maxExecutors")) {
-confExecutorsTemp = 
sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, 
sparkContext)
+  }
+
+  /**
+   * This method will ensure that the required/configured number of 
executors are requested
+   * for processing the identified blocks
+   *
+   * @param nodeMapping
+   * @param blockList
+   * @param sparkContext
+   * @return
+   */
+  private def ensureExecutorsByNumberAndGetNodeList(nodeMapping: 
java.util.Map[String, java.util
+  .List[Distributable]], blockList: Seq[Distributable],
+  sparkContext: SparkContext): Seq[String] = {
+val nodesOfData = nodeMapping.size()
+val confExecutors: Int = getConfiguredExecutors(sparkContext)
+LOGGER.info("Executors configured : " + confExecutors)
+val requiredExecutors = if (nodesOfData < 1 || nodesOfData > 
confExecutors) {
+  confExecutors
+} else if (confExecutors > nodesOfData) {
+  // this case will come only if dynamic allocation is true
+  var totalExecutorsToBeRequested = nodesOfData
+  // If total number of blocks are greater than the nodes identified 
then ensure
+  // that the configured number of max executors can be opened based 
on the difference of
+  // block list size and nodes identified
+  if (blockList.size > nodesOfData) {
+// e.g 1. blockList size = 40, nodesOfData = 3, confExecutors = 6, 
then all executors
+// need to be opened
+// 2. blockList size = 4, nodesOfData = 3, confExecutors = 6, then
+// total 4 executors need to be opened
+val extraExecutorsToBeRequested = blockList.size - nodesOfData
+if (extraExecutorsToBeRequested > confExecutors) {
+  totalExecutorsToBeRequested = confExecutors
+} else {
+  totalExecutorsToBeRequested = nodesOfData + 
extraExecutorsToBeRequested
+}
   }
+  LOGGER.info("Total executors requested: " + 
totalExecutorsToBeRequested)
+  totalExecutorsToBeRequested
+} else {
+  nodesOfData
 }
 
-val confExecutors = if (null != confExecutorsTemp) {
-  confExecutorsTemp.toInt
+val startTime = System.currentTimeMillis();
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // this case will come only if dynamic allocation is true
+  CarbonContext
+.ensureExecutors(sparkContext, requiredExecutors, blockList.size, 
Map.empty)
 } else {
-  1
+  CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
 }
-val requiredExecutors = if (nodeMapping.size > confExecutors) {
-  confExecutors
+getDistinctNodesList(sparkContext, requiredExecutors, startTime)
+  }
+
+  /**
+   * This method will return the configured executors
+   *
+   * @param sparkContext
+   * @return
+   */
+  private def getConfiguredExecutors(sparkContext: SparkContext): Int = {
+var confExecutors: Int = 0
+if (sparkContext.getConf.getBoolean("spark.dynamicAllocation.enabled", 
false)) {
+  // default value for spark.dynamicAllocation.maxExecutors is infinity
+  confExecutors = 
sparkContext.getConf.getInt("spark.dynamicAllocation.maxExecutors", 1)
 } else {
-  nodeMapping.size()
+  // default value for spark.executor.instances is 2
+  confExecutors = 
sparkContext.getConf.get

[GitHub] incubator-carbondata pull request #362: [CARBONDATA-459] Block distribution ...

2016-11-28 Thread manishgupta88
GitHub user manishgupta88 opened a pull request:

https://github.com/apache/incubator-carbondata/pull/362

[CARBONDATA-459] Block distribution is wrong in case of dynamic 
allocation=true

Problem: Block distribution is wrong in case of dynamic allocation=true

Analysis: In case when dynamic allocation is true and configured max 
executors are more than the initial executors then carbon is not able to 
request the max number of executors configured. Due to this resources are 
getting under utilized and case when number of blocks increases, the 
distribution of blocks is limited to the number of nodes and the number of 
tasks launched are less. This leads to under utilization of resources and hence 
impacts the query and load performance.

Fix: Request for starting the maximum number of configured executors in 
case dynamic allocation is true.

Impact area: Query and data load flow performance due to under utilization 
of resources.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manishgupta88/incubator-carbondata 
dynamic_allocation_block_distribution

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-carbondata/pull/362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #362


commit 0019dfc08ef589d75c45feecd8b559bca0b35f08
Author: manishgupta88 
Date:   2016-11-28T10:07:11Z

Problem: Block distribution is wrong in case of dynamic allocation=true

Analysis: In case when dynamic allocation is true and configured max 
executors are more than the initial executors then carbon is not able to 
request the max number of executors configured. Due to this resources are 
getting under utilized and case when number of blocks increases, the 
distribution of blocks is limited to the number of nodes and the number of 
tasks launched are less. This leads to under utilization of resources and hence 
impacts the query and load performance.

Fix: Request for starting the maximum number of configured executors in 
case dynamic allocation is true.

Impact area: Query and data load flow performance due to under utilization 
of resources.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---