Repository: carbondata
Updated Branches:
  refs/heads/carbonstore-rebase 5c55dfe19 -> 132fbd4d8 (forced update)


[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/132fbd4d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/132fbd4d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/132fbd4d

Branch: refs/heads/carbonstore-rebase
Commit: 132fbd4d8d2a91efca542bd31ae5935171802b7e
Parents: c82734e
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Fri Feb 9 21:45:48 2018 +0800

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  10 +
 .../core/datastore/block/TableBlockInfo.java    |  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md               |   1 +
 .../sdv/generated/MergeIndexTestCase.scala      |  11 +-
 .../CarbonIndexFileMergeTestCase.scala          |  19 +-
 .../StandardPartitionTableLoadingTestCase.scala |   5 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |   4 +-
 .../spark/sql/hive/DistributionUtil.scala       |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  18 +-
 .../merger/NodeMultiBlockRelation.java          |  40 ++
 .../processing/util/CarbonLoaderUtil.java       | 494 ++++++++++++-------
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +++++
 13 files changed, 564 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
    */
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 10000000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+      = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator<Distributable> DATA_SIZE_DESC_COMPARATOR =
+      new Comparator<Distributable>() {
+        @Override public int compare(Distributable o1, Distributable o2) {
+          long diff =
+              ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+          return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+        }
+      };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
       String[] locations, long blockLength, ColumnarFormatVersion version,
       String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo implements Distributable, 
Serializable {
   public void setDataMapWriterPath(String dataMapWriterPath) {
     this.dataMapWriterPath = dataMapWriterPath;
   }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("TableBlockInfo{");
+    sb.append("filePath='").append(filePath).append('\'');
+    sb.append(", blockOffset=").append(blockOffset);
+    sb.append(", blockLength=").append(blockLength);
+    sb.append(", segmentId='").append(segmentId).append('\'');
+    sb.append(", blockletId='").append(blockletId).append('\'');
+    sb.append(", locations=").append(Arrays.toString(locations));
+    sb.append('}');
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 9d52669..474988c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1235,6 +1235,17 @@ public final class CarbonProperties {
       return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT;
     }
   }
+
+  /**
+   * whether optimization for skewed data is enabled
+   * @return true, if enabled; false for not enabled.
+   */
+  public boolean isLoadSkewedDataOptimizationEnabled() {
+    String skewedEnabled = getProperty(
+        CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION,
+        
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT);
+    return skewedEnabled.equalsIgnoreCase("true");
+  }
   /**
    * returns true if carbon property
    * @param key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md 
b/docs/useful-tips-on-carbondata.md
index aaf6460..f403d7c 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -169,5 +169,6 @@
   | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | 
Whether use YARN local directories for multi-table load disk load balance | If 
this is set it to true CarbonData will use YARN local directories for 
multi-table load disk load balance, that will improve the data load 
performance. |
   | carbon.use.multiple.temp.dir | spark/carbonlib/carbon.properties | Data 
loading | Whether to use multiple YARN local directories during table data 
loading for disk load balance | After enabling 'carbon.use.local.dir', if this 
is set to true, CarbonData will use all YARN local directories during data load 
for disk load balance, that will improve the data load performance. Please 
enable this property when you encounter disk hotspot problem during data 
loading. |
   | carbon.sort.temp.compressor | spark/carbonlib/carbon.properties | Data 
loading | Specify the name of compressor to compress the intermediate sort 
temporary files during sort procedure in data loading. | The optional values 
are 'SNAPPY','GZIP','BZIP2','LZ4' and empty. By default, empty means that 
Carbondata will not compress the sort temp files. This parameter will be useful 
if you encounter disk bottleneck. |
+  | carbon.load.skewedDataOptimization.enabled | 
spark/carbonlib/carbon.properties | Data loading | Whether to enable size based 
block allocation strategy for data loading. | When loading, carbondata will use 
file size based block allocation strategy for task distribution. It will make 
sure that all the executors process the same size of data -- It's useful if the 
size of your input data files varies widely, say 1MB~1GB. |
 
   Note: If your CarbonData instance is provided only for query, you may 
specify the property 'spark.speculation=true' which is in conf directory of 
spark.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index ed6d741..e2207bf 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -52,9 +52,8 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE 
carbon_automation_merge 
OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 
rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
 
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0"),
 false)
     assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
       sql("""Select count(*) from carbon_automation_merge"""))
@@ -71,11 +70,10 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 
2)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 
2)
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0"),
 false)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"1"),
 false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 
0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 
0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), 
rows)
@@ -95,9 +93,8 @@ class MergeIndexTestCase extends QueryTest with 
BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 
2)
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      
.mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0.1"),
 false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") 
== 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), 
rows)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 895b0b5..bc22138 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -62,9 +62,8 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge 
OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0"),
 false)
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
@@ -86,11 +85,10 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0"),
 false)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"1"),
 false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -112,11 +110,10 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0"),
 false)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), 
false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"1"),
 false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -142,9 +139,8 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      
.mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0.1"),
 false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
@@ -172,9 +168,8 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = 
CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, 
table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      
.mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+      
.mergeCarbonIndexFilesOfSegment(CarbonTablePath.getSegmentPath(table.getTablePath,"0.1"),
 false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "2") == 100)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 2bd6031..0405df2 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -31,7 +31,7 @@ import 
org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 class StandardPartitionTableLoadingTestCase extends QueryTest with 
BeforeAndAfterAll {
@@ -430,8 +430,7 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
     sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles")
     FileUtils.deleteDirectory(folder)
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
"smallpartitionfiles")
-    val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
"0")
     assert(new File(segmentDir).listFiles().length < 50)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 06acbba..8ba2767 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -281,8 +281,10 @@ class NewCarbonDataLoadRDD[K, V](
         val format = new CSVInputFormat
 
         val split = theSplit.asInstanceOf[CarbonNodePartition]
+        val inputSize = split.blocksDetails.map(_.getBlockLength).sum * 0.1 * 
10  / 1024 / 1024
         logInfo("Input split: " + split.serializableHadoopSplit)
-        logInfo("The Block Count in this node :" + 
split.nodeBlocksDetail.length)
+        logInfo("The block count in this node: " + 
split.nodeBlocksDetail.length)
+        logInfo(f"The input data size in this node: $inputSize%.2fMB")
         
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
             split.serializableHadoopSplit, split.nodeBlocksDetail.length)
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 1958d61..a676dd8 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -128,7 +128,7 @@ object DistributionUtil {
    */
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
       sparkContext: SparkContext): Seq[String] = {
-    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
+    val nodeMapping = CarbonLoaderUtil.nodeBlockMapping(blockList.asJava)
     ensureExecutorsByNumberAndGetNodeList(nodeMapping, blockList, sparkContext)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 828c067..daafe6a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1045,10 +1045,16 @@ object CarbonDataRDDFactory {
     val startTime = System.currentTimeMillis
     val activeNodes = DistributionUtil
       .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
-    val nodeBlockMapping =
-      CarbonLoaderUtil
-        .nodeBlockMapping(blockList.toSeq.asJava, -1, 
activeNodes.toList.asJava).asScala
-        .toSeq
+    val skewedDataOptimization = CarbonProperties.getInstance()
+      .isLoadSkewedDataOptimizationEnabled()
+    val blockAssignStrategy = if (skewedDataOptimization) {
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST
+    } else {
+      CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST
+    }
+    LOGGER.info(s"Allocating block to nodes using strategy: 
$blockAssignStrategy")
+    val nodeBlockMapping = 
CarbonLoaderUtil.nodeBlockMapping(blockList.toSeq.asJava, -1,
+      activeNodes.toList.asJava, blockAssignStrategy).asScala.toSeq
     val timeElapsed: Long = System.currentTimeMillis - startTime
     LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
     LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
@@ -1056,7 +1062,9 @@ object CarbonDataRDDFactory {
     var str = ""
     nodeBlockMapping.foreach { entry =>
       val tableBlock = entry._2
-      str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
+      val totalSize = 
tableBlock.asScala.map(_.asInstanceOf[TableBlockInfo].getBlockLength).sum
+      str = str + "#Node: " + entry._1 + ", no.of.blocks: " + 
tableBlock.size() +
+            f", totalsize.of.blocks: ${totalSize * 0.1 * 10 / 1024 
/1024}%.2fMB"
       tableBlock.asScala.foreach(tableBlockInfo =>
         if (!tableBlockInfo.getLocations.exists(hostentry =>
           hostentry.equalsIgnoreCase(entry._1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
index ec2ddaf..1bb5736 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
@@ -16,15 +16,41 @@
  */
 package org.apache.carbondata.processing.merger;
 
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 
 public class NodeMultiBlockRelation implements 
Comparable<NodeMultiBlockRelation> {
 
   private final List<Distributable> blocks;
   private final String node;
 
+  /**
+   * comparator to sort by data size in descending order. This is used to 
assign big blocks to
+   * bigger nodes first.
+   */
+  public static final Comparator<NodeMultiBlockRelation> 
DATA_SIZE_DESC_COMPARATOR =
+      new Comparator<NodeMultiBlockRelation>() {
+        @Override
+        public int compare(NodeMultiBlockRelation o1, NodeMultiBlockRelation 
o2) {
+          long diff = o1.getTotalSizeOfBlocks() - o2.getTotalSizeOfBlocks();
+          return diff > 0 ? -1 : (diff < 0 ? 1 : 0);
+        }
+      };
+  /**
+   * comparator to sort by data size in ascending order. This is used to 
assign left over blocks to
+   * smaller nodes first.
+   */
+  public static final Comparator<NodeMultiBlockRelation> 
DATA_SIZE_ASC_COMPARATOR =
+      new Comparator<NodeMultiBlockRelation>() {
+        @Override
+        public int compare(NodeMultiBlockRelation o1, NodeMultiBlockRelation 
o2) {
+          long diff = o1.getTotalSizeOfBlocks() - o2.getTotalSizeOfBlocks();
+          return diff > 0 ? 1 : (diff < 0 ? -1 : 0);
+        }
+      };
   public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
     this.node = node;
     this.blocks = blocks;
@@ -39,6 +65,20 @@ public class NodeMultiBlockRelation implements 
Comparable<NodeMultiBlockRelation
     return node;
   }
 
+  /**
+   * get the total size of the blocks
+   * @return size in bytes
+   */
+  public long getTotalSizeOfBlocks() {
+    long totalSize = 0;
+    if (blocks.get(0) instanceof TableBlockInfo) {
+      for (Distributable block : blocks) {
+        totalSize += ((TableBlockInfo) block).getBlockLength();
+      }
+    }
+    return totalSize;
+  }
+
   @Override public int compareTo(NodeMultiBlockRelation obj) {
     return this.blocks.size() - obj.getBlocks().size();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index d7a0fe9..f675840 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -24,7 +24,16 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -35,6 +44,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -55,7 +65,6 @@ import 
org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
 import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
@@ -70,6 +79,23 @@ public final class CarbonLoaderUtil {
   private CarbonLoaderUtil() {
   }
 
+  /**
+   * strategy for assign blocks to nodes/executors
+   */
+  public enum BlockAssignmentStrategy {
+    BLOCK_NUM_FIRST("Assign blocks to node base on number of blocks"),
+    BLOCK_SIZE_FIRST("Assign blocks to node base on data size of blocks");
+    private String name;
+    BlockAssignmentStrategy(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public String toString() {
+      return this.getClass().getSimpleName() + ':' + this.name;
+    }
+  }
+
   public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) 
{
     String segmentPath = CarbonTablePath.getSegmentPath(
         loadModel.getTablePath(), currentLoad + "");
@@ -457,9 +483,9 @@ public final class CarbonLoaderUtil {
   public static Map<String, List<List<Distributable>>> nodeBlockTaskMapping(
       List<Distributable> blockInfos, int noOfNodesInput, int parallelism,
       List<String> activeNode) {
-
     Map<String, List<Distributable>> mapOfNodes =
-        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, 
activeNode);
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, 
activeNode,
+            BlockAssignmentStrategy.BLOCK_NUM_FIRST);
     int taskPerNode = parallelism / mapOfNodes.size();
     //assigning non zero value to noOfTasksPerNode
     int noOfTasksPerNode = taskPerNode == 0 ? 1 : taskPerNode;
@@ -475,7 +501,8 @@ public final class CarbonLoaderUtil {
    */
   public static Map<String, List<Distributable>> 
nodeBlockMapping(List<Distributable> blockInfos,
       int noOfNodesInput) {
-    return nodeBlockMapping(blockInfos, noOfNodesInput, null);
+    return nodeBlockMapping(blockInfos, noOfNodesInput, null,
+        BlockAssignmentStrategy.BLOCK_NUM_FIRST);
   }
 
   /**
@@ -490,82 +517,59 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * the method returns the number of required executors
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> getRequiredExecutors(
-      List<Distributable> blockInfos) {
-    List<NodeBlockRelation> flattenedList =
-        new 
ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (Distributable blockInfo : blockInfos) {
-      try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("error getting location of block: " + 
blockInfo.toString(), e);
-      }
-    }
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
-    return nodeAndBlockMapping;
-  }
-
-  /**
    * This method will divide the blocks among the nodes as per the data 
locality
    *
-   * @param blockInfos
+   * @param blockInfos blocks
    * @param noOfNodesInput -1 if number of nodes has to be decided
    *                       based on block location information
-   * @return
+   * @param blockAssignmentStrategy strategy used to assign blocks
+   * @return a map that maps node to blocks
    */
-  public static Map<String, List<Distributable>> 
nodeBlockMapping(List<Distributable> blockInfos,
-      int noOfNodesInput, List<String> activeNodes) {
-
-    Map<String, List<Distributable>> nodeBlocksMap =
-        new HashMap<String, 
List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    List<NodeBlockRelation> flattenedList =
-        new 
ArrayList<NodeBlockRelation>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    Set<Distributable> uniqueBlocks =
-        new 
HashSet<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    Set<String> nodes = new 
HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    createFlattenedListFromMap(blockInfos, flattenedList, uniqueBlocks, nodes);
+  public static Map<String, List<Distributable>> nodeBlockMapping(
+      List<Distributable> blockInfos, int noOfNodesInput, List<String> 
activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    ArrayList<NodeMultiBlockRelation> rtnNode2Blocks = new ArrayList<>();
+
+    Set<Distributable> uniqueBlocks = new HashSet<>(blockInfos);
+    ArrayList<NodeMultiBlockRelation> originNode2Blocks = 
createNode2BlocksMapping(blockInfos);
+    Set<String> nodes = new HashSet<>(originNode2Blocks.size());
+    for (NodeMultiBlockRelation relation : originNode2Blocks) {
+      nodes.add(relation.getNode());
+    }
 
     int noofNodes = (-1 == noOfNodesInput) ? nodes.size() : noOfNodesInput;
     if (null != activeNodes) {
       noofNodes = activeNodes.size();
     }
-    int blocksPerNode = blockInfos.size() / noofNodes;
-    blocksPerNode = blocksPerNode <= 0 ? 1 : blocksPerNode;
 
-    // sort the flattened data.
-    Collections.sort(flattenedList);
-
-    Map<String, List<Distributable>> nodeAndBlockMapping =
-        new LinkedHashMap<String, List<Distributable>>(
-            CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // from the flattened list create a mapping of node vs Data blocks.
-    createNodeVsBlockMapping(flattenedList, nodeAndBlockMapping);
+    // calculate the average expected size for each node
+    long sizePerNode = 0;
+    if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == blockAssignmentStrategy) {
+      sizePerNode = blockInfos.size() / noofNodes;
+      sizePerNode = sizePerNode <= 0 ? 1 : sizePerNode;
+    } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == 
blockAssignmentStrategy) {
+      long totalFileSize = 0;
+      for (Distributable blockInfo : uniqueBlocks) {
+        totalFileSize += ((TableBlockInfo) blockInfo).getBlockLength();
+      }
+      sizePerNode = totalFileSize / noofNodes;
+    }
 
-    // so now we have a map of node vs blocks. allocate the block as per the 
order
-    createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, 
nodeAndBlockMapping, activeNodes);
+    // assign blocks to each node
+    assignBlocksByDataLocality(rtnNode2Blocks, sizePerNode, uniqueBlocks, 
originNode2Blocks,
+        activeNodes, blockAssignmentStrategy);
 
     // if any blocks remain then assign them to nodes in round robin.
-    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, 
activeNodes);
+    assignLeftOverBlocks(rtnNode2Blocks, uniqueBlocks, sizePerNode, 
activeNodes,
+        blockAssignmentStrategy);
 
-    return nodeBlocksMap;
+    // convert
+    Map<String, List<Distributable>> rtnNodeBlocksMap =
+        new HashMap<String, 
List<Distributable>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (NodeMultiBlockRelation relation : rtnNode2Blocks) {
+      rtnNodeBlocksMap.put(relation.getNode(), relation.getBlocks());
+    }
+    return rtnNodeBlocksMap;
   }
 
   /**
@@ -640,92 +644,207 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * If any left over data blocks are present then assign those to nodes in 
round robin way.
-   *
-   * @param outputMap
-   * @param uniqueBlocks
+   * If any left over data blocks are present then assign those to nodes in 
round robin way. This
+   * will not obey the data locality.
    */
-  private static void assignLeftOverBlocks(Map<String, List<Distributable>> 
outputMap,
-      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> 
activeNodes) {
+  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> 
outputMap,
+      Set<Distributable> leftOverBlocks, long expectedSizePerNode, 
List<String> activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
+    for (int idx = 0; idx < outputMap.size(); idx++) {
+      node2Idx.put(outputMap.get(idx).getNode(), idx);
+    }
 
+    // iterate all the nodes and try to allocate blocks to the nodes
     if (activeNodes != null) {
       for (String activeNode : activeNodes) {
-        List<Distributable> blockLst = outputMap.get(activeNode);
-        if (null == blockLst) {
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Second assignment iteration: assign for executor: " + 
activeNode);
+        }
+
+        Integer idx;
+        List<Distributable> blockLst;
+        if (node2Idx.containsKey(activeNode)) {
+          idx = node2Idx.get(activeNode);
+          blockLst = outputMap.get(idx).getBlocks();
+        } else {
+          idx = node2Idx.size();
           blockLst = new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         }
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
-        if (blockLst.size() > 0) {
-          outputMap.put(activeNode, blockLst);
+        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, 
blockAssignmentStrategy);
+
+        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
+          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
+          node2Idx.put(activeNode, idx);
         }
       }
     } else {
-      for (Map.Entry<String, List<Distributable>> entry : 
outputMap.entrySet()) {
-        List<Distributable> blockLst = entry.getValue();
-        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+      for (NodeMultiBlockRelation entry : outputMap) {
+        List<Distributable> blockLst = entry.getBlocks();
+        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, 
blockAssignmentStrategy);
       }
-
     }
 
-    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-      Iterator<Distributable> blocks = uniqueBlocks.iterator();
-      if (blocks.hasNext()) {
-        Distributable block = blocks.next();
-        List<Distributable> blockLst = entry.getValue();
-        blockLst.add(block);
-        blocks.remove();
-      }
+    // if there is still blocks left, allocate them in round robin manner to 
each nodes
+    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, 
blockAssignmentStrategy);
+  }
+
+  /**
+   * assign remaining blocks to nodes
+   *
+   * @param remainingBlocks blocks to be allocated
+   * @param expectedSizePerNode expected size for each node
+   * @param blockLst destination for the blocks to be allocated
+   * @param blockAssignmentStrategy block assignment stretegy
+   */
+  private static void populateBlocks(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    switch (blockAssignmentStrategy) {
+      case BLOCK_NUM_FIRST:
+        populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
+        break;
+      case BLOCK_SIZE_FIRST:
+        populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported block assignment strategy: " + 
blockAssignmentStrategy);
     }
   }
 
   /**
-   * The method populate the blockLst to be allocate to a specific node.
-   * @param uniqueBlocks
-   * @param noOfBlocksPerNode
-   * @param blockLst
+   * Taken N number of distributable blocks from {@param remainingBlocks} and 
add them to output
+   * {@param blockLst}. After added, the total number of {@param blockLst} is 
less
+   * than {@param expectedSizePerNode}.
    */
-  private static void populateBlocks(Set<Distributable> uniqueBlocks, int 
noOfBlocksPerNode,
-      List<Distributable> blockLst) {
-    Iterator<Distributable> blocks = uniqueBlocks.iterator();
-    //if the node is already having the per block nodes then avoid assign the 
extra blocks
-    if (blockLst.size() == noOfBlocksPerNode) {
+  private static void populateBlocksByNum(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    // if the node is already having the per block nodes then avoid assign the 
extra blocks
+    if (blockLst.size() == expectedSizePerNode) {
       return;
     }
     while (blocks.hasNext()) {
       Distributable block = blocks.next();
       blockLst.add(block);
       blocks.remove();
-      if (blockLst.size() >= noOfBlocksPerNode) {
+      if (blockLst.size() >= expectedSizePerNode) {
         break;
       }
     }
   }
 
   /**
-   * To create the final output of the Node and Data blocks
-   *
-   * @param outputMap
-   * @param blocksPerNode
-   * @param uniqueBlocks
-   * @param nodeAndBlockMapping
-   * @param activeNodes
+   * Taken N number of distributable blocks from {@param remainingBlocks} and 
add them to output
+   * {@param blockLst}. After added, the total accumulated block size of 
{@param blockLst}
+   * is less than {@param expectedSizePerNode}.
    */
-  private static void createOutputMap(Map<String, List<Distributable>> 
outputMap, int blocksPerNode,
-      Set<Distributable> uniqueBlocks, Map<String, List<Distributable>> 
nodeAndBlockMapping,
-      List<String> activeNodes) {
+  private static void populateBlocksBySize(Set<Distributable> remainingBlocks,
+      long expectedSizePerNode, List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    //if the node is already having the avg node size then avoid assign the 
extra blocks
+    long fileSize = 0;
+    for (Distributable block : blockLst) {
+      fileSize += ((TableBlockInfo) block).getBlockLength();
+    }
+    if (fileSize >= expectedSizePerNode) {
+      LOGGER.debug("Capacity is full, skip allocate blocks on this node");
+      return;
+    }
 
-    ArrayList<NodeMultiBlockRelation> multiBlockRelations =
-        new ArrayList<>(nodeAndBlockMapping.size());
-    for (Map.Entry<String, List<Distributable>> entry : 
nodeAndBlockMapping.entrySet()) {
-      multiBlockRelations.add(new NodeMultiBlockRelation(entry.getKey(), 
entry.getValue()));
+    while (blocks.hasNext()) {
+      Distributable block = blocks.next();
+      long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
+      if (fileSize < expectedSizePerNode) {
+        // `fileSize==0` means there are no blocks assigned to this node before
+        if (fileSize == 0 || fileSize + thisBlockSize <= expectedSizePerNode * 
1.1D) {
+          blockLst.add(block);
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Second Assignment iteration: "
+                + ((TableBlockInfo) block).getFilePath() + "-"
+                + ((TableBlockInfo) block).getBlockLength() + 
"-->currentNode");
+          }
+          fileSize += thisBlockSize;
+          blocks.remove();
+        }
+      } else {
+        break;
+      }
     }
-    // sort nodes based on number of blocks per node, so that nodes having 
lesser blocks
-    // are assigned first
-    Collections.sort(multiBlockRelations);
+  }
 
-    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+  /**
+   * allocate the blocks in round robin manner
+   */
+  private static void 
assignBlocksUseRoundRobin(ArrayList<NodeMultiBlockRelation> node2Blocks,
+      Set<Distributable> remainingBlocks, BlockAssignmentStrategy 
blockAssignmentStrategy) {
+    switch (blockAssignmentStrategy) {
+      case BLOCK_NUM_FIRST:
+        roundRobinAssignBlocksByNum(node2Blocks, remainingBlocks);
+        break;
+      case BLOCK_SIZE_FIRST:
+        roundRobinAssignBlocksBySize(node2Blocks, remainingBlocks);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported block assignment 
strategy: "
+            + blockAssignmentStrategy);
+    }
+  }
+
+  private static void 
roundRobinAssignBlocksByNum(ArrayList<NodeMultiBlockRelation> outputMap,
+      Set<Distributable> remainingBlocks) {
+    for (NodeMultiBlockRelation relation: outputMap) {
+      Iterator<Distributable> blocks = remainingBlocks.iterator();
+      if (blocks.hasNext()) {
+        Distributable block = blocks.next();
+        List<Distributable> blockLst = relation.getBlocks();
+        blockLst.add(block);
+        blocks.remove();
+      }
+    }
+  }
+
+  private static void 
roundRobinAssignBlocksBySize(ArrayList<NodeMultiBlockRelation> outputMap,
+      Set<Distributable> remainingBlocks) {
+    Iterator<Distributable> blocks = remainingBlocks.iterator();
+    while (blocks.hasNext()) {
+      // sort the allocated node-2-blocks in ascending order, the total data 
size of first one is
+      // the smallest, so we assign this block to it.
+      Collections.sort(outputMap, 
NodeMultiBlockRelation.DATA_SIZE_ASC_COMPARATOR);
+      Distributable block = blocks.next();
+      List<Distributable> blockLst = outputMap.get(0).getBlocks();
+      blockLst.add(block);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("RoundRobin assignment iteration: "
+            + ((TableBlockInfo) block).getFilePath() + "-"
+            + ((TableBlockInfo) block).getBlockLength() + "-->" + 
outputMap.get(0).getNode());
+      }
+      blocks.remove();
+    }
+  }
+  /**
+   * allocate distributable blocks to nodes based on data locality
+   */
+  private static void assignBlocksByDataLocality(
+      ArrayList<NodeMultiBlockRelation> outputNode2Blocks,
+      long expectedSizePerNode, Set<Distributable> remainingBlocks,
+      List<NodeMultiBlockRelation> inputNode2Blocks, List<String> activeNodes,
+      BlockAssignmentStrategy blockAssignmentStrategy) {
+    if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) {
+      // sort nodes based on data size of all blocks per node, so that nodes 
having bigger size
+      // are assigned first
+      Collections.sort(inputNode2Blocks, 
NodeMultiBlockRelation.DATA_SIZE_DESC_COMPARATOR);
+    } else {
+      // sort nodes based on number of blocks per node, so that nodes having 
lesser blocks
+      // are assigned first
+      Collections.sort(inputNode2Blocks);
+    }
+
+    Map<String, Integer> executor2Idx = new HashMap<>();
+    for (NodeMultiBlockRelation nodeMultiBlockRelation : inputNode2Blocks) {
       String nodeName = nodeMultiBlockRelation.getNode();
-      //assign the block to the node only if the node is active
+      // assign the block to the node only if the node is active
       String activeExecutor = nodeName;
       if (null != activeNodes) {
         activeExecutor = getActiveExecutor(activeNodes, nodeName);
@@ -733,29 +852,75 @@ public final class CarbonLoaderUtil {
           continue;
         }
       }
-      // this loop will be for each NODE
-      int nodeCapacity = 0;
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("First Assignment iteration: assign for executor: " + 
activeExecutor);
+      }
+
+      List<Distributable> blocksInThisNode = 
nodeMultiBlockRelation.getBlocks();
+      if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == blockAssignmentStrategy) 
{
+        // sort blocks based on block size, so that bigger blocks will be 
assigned first
+        Collections.sort(blocksInThisNode, 
TableBlockInfo.DATA_SIZE_DESC_COMPARATOR);
+      }
+
+      long nodeCapacity = 0;
       // loop thru blocks of each Node
       for (Distributable block : nodeMultiBlockRelation.getBlocks()) {
+        if (!remainingBlocks.contains(block)) {
+          // this block has been added before
+          continue;
+        }
+        // this is the first time to add block to this node, initialize it
+        if (!executor2Idx.containsKey(activeExecutor)) {
+          Integer idx = executor2Idx.size();
+          outputNode2Blocks.add(idx, new NodeMultiBlockRelation(activeExecutor,
+              new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)));
+          executor2Idx.put(activeExecutor, idx);
+        }
 
-        // check if this is already assigned.
-        if (uniqueBlocks.contains(block)) {
-
-          if (null == outputMap.get(activeExecutor)) {
-            List<Distributable> list =
-                new 
ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-            outputMap.put(activeExecutor, list);
-          }
-          // assign this block to this node if node has capacity left
-          if (nodeCapacity < blocksPerNode) {
-            List<Distributable> infos = outputMap.get(activeExecutor);
+        // assign this block to this node if node has capacity left
+        if (BlockAssignmentStrategy.BLOCK_NUM_FIRST == 
blockAssignmentStrategy) {
+          if (nodeCapacity < expectedSizePerNode) {
+            Integer idx = executor2Idx.get(activeExecutor);
+            List<Distributable> infos = outputNode2Blocks.get(idx).getBlocks();
             infos.add(block);
             nodeCapacity++;
-            uniqueBlocks.remove(block);
+            if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug(
+                  "First Assignment iteration: " + ((TableBlockInfo) 
block).getFilePath() + '-'
+                      + ((TableBlockInfo) block).getBlockLength() + "-->" + 
activeExecutor);
+            }
+            remainingBlocks.remove(block);
+          } else {
+            // No need to continue loop as node is full
+            break;
+          }
+        } else if (BlockAssignmentStrategy.BLOCK_SIZE_FIRST == 
blockAssignmentStrategy) {
+          long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
+          // `nodeCapacity == 0` means that there is a huge block that already 
exceed the
+          // `expectedSize` of the node, so we have to assign it to some node, 
otherwise it will
+          // be assigned in the last RoundRobin iteration.
+          if (nodeCapacity == 0 || nodeCapacity < expectedSizePerNode) {
+            if (nodeCapacity == 0 || nodeCapacity + thisBlockSize <= 
expectedSizePerNode * 1.05D) {
+              Integer idx = executor2Idx.get(activeExecutor);
+              List<Distributable> blocks = 
outputNode2Blocks.get(idx).getBlocks();
+              blocks.add(block);
+              nodeCapacity += thisBlockSize;
+              if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(
+                    "First Assignment iteration: " + ((TableBlockInfo) 
block).getFilePath() + '-'
+                        + ((TableBlockInfo) block).getBlockLength() + "-->" + 
activeExecutor);
+              }
+              remainingBlocks.remove(block);
+            }
+            // this block is too big for current node and there are still 
capacity left
+            // for small files, so continue to allocate block on this node in 
next iteration.
           } else {
             // No need to continue loop as node is full
             break;
           }
+        } else {
+          throw new IllegalArgumentException(
+              "Unsupported block assignment strategy: " + 
blockAssignmentStrategy);
         }
       }
     }
@@ -799,60 +964,37 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * Create the Node and its related blocks Mapping and put in a Map
+   * Create node to blocks mapping
    *
-   * @param flattenedList
-   * @param nodeAndBlockMapping
+   * @param blockInfos input block info
    */
-  private static void createNodeVsBlockMapping(List<NodeBlockRelation> 
flattenedList,
-      Map<String, List<Distributable>> nodeAndBlockMapping) {
-    for (NodeBlockRelation nbr : flattenedList) {
-      String node = nbr.getNode();
-      List<Distributable> list;
-
-      if (null == nodeAndBlockMapping.get(node)) {
-        list = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-        list.add(nbr.getBlock());
-        nodeAndBlockMapping.put(node, list);
-      } else {
-        list = nodeAndBlockMapping.get(node);
-        list.add(nbr.getBlock());
-      }
-    }
-    /*for resolving performance issue, removed values() with entrySet () 
iterating the values and
-    sorting it.entrySet will give the logical view for hashMap and we dont 
query the map twice for
-    each key whereas values () iterate twice*/
-    Iterator<Map.Entry<String, List<Distributable>>> iterator =
-        nodeAndBlockMapping.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Collections.sort(iterator.next().getValue());
-    }
-  }
+  private static ArrayList<NodeMultiBlockRelation> createNode2BlocksMapping(
+      List<Distributable> blockInfos) {
+    Map<String, Integer> node2Idx = new HashMap<>();
+    ArrayList<NodeMultiBlockRelation> node2Blocks = new ArrayList<>();
 
-  /**
-   * Create the flat List i.e flattening of the Map.
-   *
-   * @param blockInfos
-   * @param flattenedList
-   * @param uniqueBlocks
-   */
-  private static void createFlattenedListFromMap(List<Distributable> 
blockInfos,
-      List<NodeBlockRelation> flattenedList, Set<Distributable> uniqueBlocks,
-      Set<String> nodeList) {
     for (Distributable blockInfo : blockInfos) {
-      // put the blocks in the set
-      uniqueBlocks.add(blockInfo);
-
       try {
-        for (String eachNode : blockInfo.getLocations()) {
-          NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode);
-          flattenedList.add(nbr);
-          nodeList.add(eachNode);
+        for (final String eachNode : blockInfo.getLocations()) {
+          if (node2Idx.containsKey(eachNode)) {
+            Integer idx = node2Idx.get(eachNode);
+            List<Distributable> blocks = node2Blocks.get(idx).getBlocks();
+            blocks.add(blockInfo);
+          } else {
+            // add blocks to this node for the first time
+            Integer idx = node2Idx.size();
+            List<Distributable> blocks = new ArrayList<>();
+            blocks.add(blockInfo);
+            node2Blocks.add(idx, new NodeMultiBlockRelation(eachNode, blocks));
+            node2Idx.put(eachNode, idx);
+          }
         }
       } catch (IOException e) {
         throw new RuntimeException("error getting location of block: " + 
blockInfo.toString(), e);
       }
     }
+
+    return node2Blocks;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
 
b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
new file mode 100644
index 0000000..9c66ada
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/util/CarbonLoaderUtilTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonLoaderUtilTest {
+  private final static LogService LOGGER
+      = LogServiceFactory.getLogService(CarbonLoaderUtilTest.class.getName());
+
+  private List<Distributable> generateBlocks() {
+    List<Distributable> blockInfos = new ArrayList<>();
+    String filePath = "/fakepath";
+    String blockId = "1";
+
+    String[] locations = new String[] { "host2", "host3" };
+    ColumnarFormatVersion version = ColumnarFormatVersion.V1;
+
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo(filePath + "_a", 0,
+        blockId, locations, 30 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo1);
+
+    TableBlockInfo tableBlockInfo2 = new TableBlockInfo(filePath + "_b", 0,
+        blockId, locations, 40 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo2);
+
+    TableBlockInfo tableBlockInfo3 = new TableBlockInfo(filePath + "_c", 0,
+        blockId, locations, 20 * 1024 * 1024, version, null);
+    blockInfos.add(tableBlockInfo3);
+
+    TableBlockInfo tableBlockInfo4 = new TableBlockInfo(filePath + "_d", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo4);
+
+    TableBlockInfo tableBlockInfo5 = new TableBlockInfo(filePath + "_e", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo5);
+
+    TableBlockInfo tableBlockInfo6 = new TableBlockInfo(filePath + "_f", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo6);
+
+    TableBlockInfo tableBlockInfo7 = new TableBlockInfo(filePath + "_g", 0,
+        blockId, locations, 1, version, null);
+    blockInfos.add(tableBlockInfo7);
+    return blockInfos;
+  }
+
+  private List<String> generateExecutors() {
+    List<String> activeNodes = new ArrayList<>();
+    activeNodes.add("host1");
+    activeNodes.add("host2");
+    activeNodes.add("host3");
+    return activeNodes;
+  }
+
+  @Test
+  public void testNodeBlockMappingByDataSize() throws Exception {
+    List<Distributable> blockInfos = generateBlocks();
+    List<String> activeNodes = generateExecutors();
+
+    // the blocks are assigned by size, so the number of block for each node 
are different
+    Map<String, List<Distributable>> nodeMappingBySize =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_SIZE_FIRST);
+    LOGGER.info(convertMapListAsString(nodeMappingBySize));
+    Assert.assertEquals(3, nodeMappingBySize.size());
+    for (Map.Entry<String, List<Distributable>> entry : 
nodeMappingBySize.entrySet()) {
+      if (entry.getValue().size() == 1) {
+        // only contains the biggest block
+        Assert.assertEquals(40 * 1024 * 1024L,
+            ((TableBlockInfo) entry.getValue().get(0)).getBlockLength());
+      } else {
+        Assert.assertTrue(entry.getValue().size() > 1);
+      }
+    }
+
+    // the blocks are assigned by number, so the number of blocks for each 
node are nearly the same
+    Map<String, List<Distributable>> nodeMappingByNum =
+        CarbonLoaderUtil.nodeBlockMapping(blockInfos, -1, activeNodes,
+            CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST);
+    LOGGER.info(convertMapListAsString(nodeMappingByNum));
+    Assert.assertEquals(3, nodeMappingBySize.size());
+    for (Map.Entry<String, List<Distributable>> entry : 
nodeMappingByNum.entrySet()) {
+      Assert.assertTrue(entry.getValue().size() == blockInfos.size() / 3
+          || entry.getValue().size() == blockInfos.size() / 3 + 1);
+    }
+  }
+
+  private <K, T> String convertMapListAsString(Map<K, List<T>> mapList) {
+    StringBuffer sb = new StringBuffer();
+    for (Map.Entry<K, List<T>> entry : mapList.entrySet()) {
+      String key = entry.getKey().toString();
+      String value = StringUtils.join(entry.getValue(), ", ");
+      sb.append(key).append(" -- 
").append(value).append(System.lineSeparator());
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file

Reply via email to