[CARBONDATA-227] In block distribution parralelism is decided initially and not re initialized after requesting new executors. Due to this task per node initialization is getting wrong.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/acb1d979 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/acb1d979 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/acb1d979 Branch: refs/heads/branch-0.1 Commit: acb1d979e4f61bf727f69ce553f0f9cad0954ae8 Parents: e1f34cc Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Thu Sep 8 10:37:49 2016 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Sep 22 09:34:05 2016 +0530 ---------------------------------------------------------------------- .../apache/carbondata/spark/rdd/CarbonMergerRDD.scala | 4 ++-- .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/acb1d979/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index f160fd9..54d7539 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -55,7 +55,6 @@ class CarbonMergerRDD[K, V]( confExecutorsTemp: String) extends RDD[(K, V)](sc, Nil) with Logging { - val defaultParallelism = sc.defaultParallelism sc.setLocalProperty("spark.scheduler.pool", "DDL") sc.setLocalProperty("spark.job.interruptOnCancel", "true") @@ -228,6 +227,7 @@ class CarbonMergerRDD[K, V]( ) val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) = QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) + var defaultParallelism = sparkContext.defaultParallelism val result = new util.ArrayList[Partition](defaultParallelism) // mapping of the node and block list. @@ -299,7 +299,7 @@ class CarbonMergerRDD[K, V]( maxTimes = maxTimes - 1 } logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis") - + defaultParallelism = sparkContext.defaultParallelism var i = 0 val nodeTaskBlocksMap: util.Map[String, util.List[NodeInfo]] = new util.HashMap[String, util http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/acb1d979/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 5f50644..497d9f8 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -73,9 +73,9 @@ class CarbonScanRDD[V: ClassTag]( baseStoreLocation: String) extends RDD[V](sc, Nil) with Logging { - val defaultParallelism = sc.defaultParallelism override def getPartitions: Array[Partition] = { + var defaultParallelism = sparkContext.defaultParallelism val statisticRecorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance() val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) = QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier) @@ -110,13 +110,18 @@ class CarbonScanRDD[V: ClassTag]( new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets) ) ) + var activeNodes = Array[String]() + if(blockListTemp.nonEmpty) { + activeNodes = DistributionUtil + .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext) + } + defaultParallelism = sparkContext.defaultParallelism val blockList = CarbonLoaderUtil. distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala + if (blockList.nonEmpty) { var statistic = new QueryStatistic() // group blocks to nodes, tasks - val activeNodes = DistributionUtil - .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext) val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism, activeNodes.toList.asJava