01lin opened a new issue #4160:
URL: https://github.com/apache/carbondata/issues/4160


   In case of insert into or load data, the total number of tasks in the stage 
is almost equal to the number of hosts, and in general it is much smaller than 
the available executors. The low parallelism of the stage results in slower 
execution. Why must the parallelism be constrained on the distinct host?  Can 
start more tasks to increase parallelism and improve resource utilization? 
Thanks
   
   org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala: loadDataFrame
   ```
     /**
      * Execute load process to load from input dataframe
      */
     private def loadDataFrame(
         sqlContext: SQLContext,
         dataFrame: Option[DataFrame],
         carbonLoadModel: CarbonLoadModel
     ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
       try {
         val rdd = dataFrame.get.rdd
         // 基于getPreferredLocs获取优化位置,取distinct值:获取host list
         val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p 
=>
           DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
         }.distinct.length
         val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
           nodeNumOfData,
           sqlContext.sparkContext)  // 确保executor数量要和数据的节点数一样多
         val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, 
rdd, nodes.toArray
           .distinct)
   
         new NewDataFrameLoaderRDD(
           sqlContext.sparkSession,
           new DataLoadResultImpl(),
           carbonLoadModel,
           newRdd
         ).collect()
       } catch {
         case ex: Exception =>
           LOGGER.error("load data frame failed", ex)
           throw ex
       }
     }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to