[14/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-08 Thread ravipesala
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9a423c24
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9a423c24
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9a423c24

Branch: refs/heads/master
Commit: 9a423c241b01d6e0b3f1946c052230adb47f4538
Parents: 2b41f14
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: ravipesala 
Committed: Thu Mar 8 22:21:10 2018 +0530

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 480 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 10 files changed, 545 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a423c24/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a423c24/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, ColumnarFormatVersion version,
   String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo 

[21/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-08 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
--
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 7ea5cb3..e5583c2 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -19,20 +19,35 @@ package 
org.apache.carbondata.processing.loading.sort.unsafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.math.BigDecimal;
+import java.util.Arrays;
 
+import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * It can keep the data of prescribed size data in offheap/onheap memory and 
returns it when needed
  */
 public class UnsafeCarbonRowPage {
+
+  private boolean[] noDictionaryDimensionMapping;
+
+  private boolean[] noDictionarySortColumnMapping;
+
+  private int dimensionSize;
+
+  private int measureSize;
+
+  private DataType[] measureDataType;
+
+  private long[] nullSetWords;
+
   private IntPointerBuffer buffer;
 
   private int lastSize;
@@ -47,14 +62,16 @@ public class UnsafeCarbonRowPage {
 
   private long taskId;
 
-  private TableFieldStat tableFieldStat;
-  private SortStepRowHandler sortStepRowHandler;
-
-  public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock 
memoryBlock,
-  boolean saveToDisk, long taskId) {
-this.tableFieldStat = tableFieldStat;
-this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
+  boolean[] noDictionarySortColumnMapping, int dimensionSize, int 
measureSize, DataType[] type,
+  MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
+this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
+this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
+this.dimensionSize = dimensionSize;
+this.measureSize = measureSize;
+this.measureDataType = type;
 this.saveToDisk = saveToDisk;
+this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
 this.taskId = taskId;
 buffer = new IntPointerBuffer(this.taskId);
 this.dataBlock = memoryBlock;
@@ -63,44 +80,255 @@ public class UnsafeCarbonRowPage {
 this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
   }
 
-  public int addRow(Object[] row, ByteBuffer rowBuffer) {
-int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
+  public int addRow(Object[] row) {
+int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
 buffer.set(lastSize);
 lastSize = lastSize + size;
 return size;
   }
 
-  /**
-   * add raw row as intermidiate sort temp row to page
-   *
-   * @param row
-   * @param address
-   * @return
-   */
-  private int addRow(Object[] row, long address, ByteBuffer rowBuffer) {
-return 
sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
-dataBlock.getBaseObject(), address, rowBuffer);
+  private int addRow(Object[] row, long address) {
+if (row == null) {
+  throw new RuntimeException("Row is null ??");
+}
+int dimCount = 0;
+int size = 0;
+Object baseObject = dataBlock.getBaseObject();
+for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
+  if (noDictionaryDimensionMapping[dimCount]) {
+byte[] col = (byte[]) row[dimCount];
+CarbonUnsafe.getUnsafe()
+.putShort(baseObject, address + size, (short) col.length);
+size += 2;
+CarbonUnsafe.getUnsafe().copyMemory(col, 
CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+address + size, col.length);
+size += col.length;
+  } else {
+int value = (int) row[dimCount];
+CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
+size += 4;
+  }
+}
+
+// write complex dimensions here.
+for (; dimCount < dimensionSize; dimCount++) {
+ 

[22/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-08 Thread ravipesala
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8d8b589e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8d8b589e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8d8b589e

Branch: refs/heads/master
Commit: 8d8b589e78a9db1ddc101d20c1e3feb500acce19
Parents: 21704cf
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: ravipesala 
Committed: Thu Mar 8 22:21:10 2018 +0530

--
 .../core/datamap/dev/AbstractDataMapWriter.java |   5 +-
 .../core/datamap/dev/DataMapFactory.java|   2 +-
 .../blockletindex/BlockletDataMapFactory.java   |   2 +-
 .../SegmentUpdateStatusManager.java |   9 +-
 .../carbondata/core/util/NonDictionaryUtil.java |  67 ++-
 .../datamap/examples/MinMaxDataMapFactory.java  |   5 +-
 .../datamap/examples/MinMaxDataWriter.java  |   7 +-
 .../presto/util/CarbonDataStoreCreator.scala|   1 +
 .../CarbonIndexFileMergeTestCase.scala  |   4 -
 .../testsuite/datamap/CGDataMapTestCase.scala   |  26 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |  19 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  31 +-
 .../iud/DeleteCarbonTableTestCase.scala |   2 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |  14 +-
 .../StandardPartitionTableCleanTestCase.scala   |  12 +-
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../load/DataLoadProcessorStepOnSpark.scala |   6 +-
 .../carbondata/spark/util/DataLoadingUtil.scala |   2 +-
 .../datamap/DataMapWriterListener.java  |   2 +-
 .../loading/row/IntermediateSortTempRow.java| 117 -
 .../loading/sort/SortStepRowHandler.java| 466 ---
 .../loading/sort/SortStepRowUtil.java   | 103 
 .../sort/unsafe/UnsafeCarbonRowPage.java| 331 +++--
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  57 ++-
 .../unsafe/comparator/UnsafeRowComparator.java  |  95 ++--
 .../UnsafeRowComparatorForNormalDIms.java   |  59 +++
 .../UnsafeRowComparatorForNormalDims.java   |  59 ---
 .../sort/unsafe/holder/SortTempChunkHolder.java |   3 +-
 .../holder/UnsafeFinalMergePageHolder.java  |  19 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java |  21 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   | 138 --
 .../merger/UnsafeIntermediateFileMerger.java| 118 -
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  27 +-
 .../processing/merger/CarbonDataMergerUtil.java |   8 +-
 .../merger/CompactionResultSortProcessor.java   |   5 +-
 .../merger/RowResultMergerProcessor.java|   5 +-
 .../partition/spliter/RowResultProcessor.java   |   5 +-
 .../sort/sortdata/IntermediateFileMerger.java   |  95 +++-
 .../IntermediateSortTempRowComparator.java  |  73 ---
 .../sort/sortdata/NewRowComparator.java |   5 +-
 .../sortdata/NewRowComparatorForNormalDims.java |   3 +-
 .../processing/sort/sortdata/RowComparator.java |  94 
 .../sortdata/RowComparatorForNormalDims.java|  62 +++
 .../SingleThreadFinalSortFilesMerger.java   |  25 +-
 .../processing/sort/sortdata/SortDataRows.java  |  85 +++-
 .../sort/sortdata/SortTempFileChunkHolder.java  | 174 +--
 .../sort/sortdata/TableFieldStat.java   | 176 ---
 .../util/CarbonDataProcessorUtil.java   |   4 +-
 .../processing/util/CarbonLoaderUtil.java   |   9 -
 49 files changed, 1368 insertions(+), 1291 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
index bcc9bad..de6dcb1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import 

[20/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-08 Thread ravipesala
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
--
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
new file mode 100644
index 000..0ae0b93
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+public class RowComparator implements Comparator {
+  /**
+   * noDictionaryCount represent number of no dictionary cols
+   */
+  private int noDictionaryCount;
+
+  /**
+   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary 
dimensions.
+   */
+  private boolean[] noDictionarySortColumnMaping;
+
+  /**
+   * @param noDictionarySortColumnMaping
+   * @param noDictionaryCount
+   */
+  public RowComparator(boolean[] noDictionarySortColumnMaping, int 
noDictionaryCount) {
+this.noDictionaryCount = noDictionaryCount;
+this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+int diff = 0;
+
+int normalIndex = 0;
+int noDictionaryindex = 0;
+
+for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
+  if (isNoDictionary) {
+byte[] byteArr1 = (byte[]) 
rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
+
+// extract a high card dims from complete byte[].
+NonDictionaryUtil
+.extractSingleHighCardDims(byteArr1, noDictionaryindex, 
noDictionaryCount, buff1);
+
+byte[] byteArr2 = (byte[]) 
rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
+
+// extract a high card dims from complete byte[].
+NonDictionaryUtil
+.extractSingleHighCardDims(byteArr2, noDictionaryindex, 
noDictionaryCount, buff2);
+
+int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
+if (difference != 0) {
+  return difference;
+}
+noDictionaryindex++;
+  } else {
+int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
+int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
+diff = dimFieldA - dimFieldB;
+if (diff != 0) {
+  return diff;
+}
+normalIndex++;
+  }
+
+}
+
+return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
--
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
new file mode 100644
index 000..0883ae1
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless 

[34/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fe8ab4c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fe8ab4c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fe8ab4c

Branch: refs/heads/carbonstore
Commit: 8fe8ab4c078de0ccd218f8ba41352896aebd5202
Parents: 28b5720
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 20:32:13 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 480 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 10 files changed, 545 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, ColumnarFormatVersion version,
   String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo 

[25/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fdd5d0f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fdd5d0f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fdd5d0f

Branch: refs/heads/carbonstore
Commit: 3fdd5d0f567e8d07cc502202ced7d490fa85e2ad
Parents: 0bb4aed
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 20:32:12 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../CarbonIndexFileMergeTestCase.scala  |   4 -
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 12 files changed, 552 insertions(+), 188 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, 

[25/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fdd5d0f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fdd5d0f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fdd5d0f

Branch: refs/heads/carbonstore-rebase5
Commit: 3fdd5d0f567e8d07cc502202ced7d490fa85e2ad
Parents: 0bb4aed
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 20:32:12 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../CarbonIndexFileMergeTestCase.scala  |   4 -
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 12 files changed, 552 insertions(+), 188 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, 

[34/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fe8ab4c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fe8ab4c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fe8ab4c

Branch: refs/heads/carbonstore-rebase5
Commit: 8fe8ab4c078de0ccd218f8ba41352896aebd5202
Parents: 28b5720
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 20:32:13 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 480 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 10 files changed, 545 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, ColumnarFormatVersion version,
   String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo 

[32/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/111bb5c4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/111bb5c4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/111bb5c4

Branch: refs/heads/carbonstore-rebase5
Commit: 111bb5c41946d72604964658ec8562f7722dec14
Parents: ef81248
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 19:54:03 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../CarbonIndexFileMergeTestCase.scala  |   4 -
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 12 files changed, 552 insertions(+), 188 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, 

[27/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-03-02 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b5908d2c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b5908d2c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b5908d2c

Branch: refs/heads/carbonstore-rebase5
Commit: b5908d2c100e66373670a40d1c785a0d932489ce
Parents: 65daaca
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Fri Mar 2 15:52:35 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../sdv/generated/MergeIndexTestCase.scala  |  11 +-
 .../CarbonIndexFileMergeTestCase.scala  |  19 +-
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 13 files changed, 563 insertions(+), 203 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b5908d2c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b5908d2c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String 

[25/49] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-02-28 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7070f54b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7070f54b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7070f54b

Branch: refs/heads/carbonstore-rebase5
Commit: 7070f54b221c2b8120841fb283c2c69bde55ee8f
Parents: c270ebb
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Wed Feb 28 22:05:21 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../sdv/generated/MergeIndexTestCase.scala  |  11 +-
 .../CarbonIndexFileMergeTestCase.scala  |  19 +-
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 13 files changed, 563 insertions(+), 203 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7070f54b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7070f54b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String 

[45/50] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-02-27 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bfc9fc66
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bfc9fc66
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bfc9fc66

Branch: refs/heads/carbonstore-rebase4
Commit: bfc9fc6697a09c6ee64df5db2c9df6a052a74528
Parents: d3289b8
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Tue Feb 27 17:04:19 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 480 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 10 files changed, 545 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfc9fc66/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfc9fc66/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, ColumnarFormatVersion version,
   String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo 

[26/50] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-02-27 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/25b09408
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/25b09408
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/25b09408

Branch: refs/heads/carbonstore-rebase4
Commit: 25b094081614b42ab9634e54c8b4446d1c8ef5c0
Parents: 3adb67d
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Tue Feb 27 16:59:48 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../sdv/generated/MergeIndexTestCase.scala  |  11 +-
 .../CarbonIndexFileMergeTestCase.scala  |  19 +-
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 13 files changed, 563 insertions(+), 203 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/25b09408/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/25b09408/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String 

[43/49] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-02-26 Thread jackylk
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d6d6f10b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d6d6f10b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d6d6f10b

Branch: refs/heads/carbonstore-rebase4
Commit: d6d6f10bc7e4e8ee5e8d1a9ee2b723ada37564a3
Parents: 93b2efd
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Tue Feb 27 09:20:37 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 480 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 10 files changed, 545 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6d6f10b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6d6f10b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, ColumnarFormatVersion version,
   String[] deletedDeltaFilePath) {
@@ -434,4 +450,17 @@ public class TableBlockInfo 

carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-02-12 Thread jackylk
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore 937bdb867 -> fd450b151


[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fd450b15
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fd450b15
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fd450b15

Branch: refs/heads/carbonstore
Commit: fd450b151cb5858504116f560da5cd7a357894e5
Parents: 937bdb8
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Mon Feb 12 19:22:46 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 480 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 10 files changed, 545 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 907708c..6624311 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -92,6 +94,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, ColumnarFormatVersion version,
   

carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading [Forced Update!]

2018-02-09 Thread jackylk
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore-rebase 5c55dfe19 -> 132fbd4d8 (forced update)


[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/132fbd4d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/132fbd4d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/132fbd4d

Branch: refs/heads/carbonstore-rebase
Commit: 132fbd4d8d2a91efca542bd31ae5935171802b7e
Parents: c82734e
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Fri Feb 9 21:45:48 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../sdv/generated/MergeIndexTestCase.scala  |  11 +-
 .../CarbonIndexFileMergeTestCase.scala  |  19 +-
 .../StandardPartitionTableLoadingTestCase.scala |   5 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 13 files changed, 564 insertions(+), 205 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index a7bfdba..c0cebe0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : 

carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

2018-02-08 Thread jackylk
Repository: carbondata
Updated Branches:
  refs/heads/carbonstore de92ea9a1 -> 6dd8b038f


[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6dd8b038
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6dd8b038
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6dd8b038

Branch: refs/heads/carbonstore
Commit: 6dd8b038fc898dbf48ad30adfc870c19eb38e3d0
Parents: de92ea9
Author: xuchuanyin 
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: Jacky Li 
Committed: Fri Feb 9 15:49:17 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/block/TableBlockInfo.java|  29 ++
 .../carbondata/core/util/CarbonProperties.java  |  11 +
 docs/useful-tips-on-carbondata.md   |   1 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala|   4 +-
 .../spark/sql/hive/DistributionUtil.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala|  18 +-
 .../merger/NodeMultiBlockRelation.java  |  40 ++
 .../processing/util/CarbonLoaderUtil.java   | 494 ---
 .../processing/util/CarbonLoaderUtilTest.java   | 125 +
 10 files changed, 551 insertions(+), 183 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6dd8b038/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index bcfeba0..a6bf60f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants {
*/
   public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000;
 
+  /**
+   * enable block size based block allocation while loading data. By default, 
carbondata assigns
+   * blocks to node based on block number. If this option is set to `true`, 
carbondata will
+   * consider block size first and make sure that all the nodes will process 
almost equal size of
+   * data. This option is especially useful when you encounter skewed data.
+   */
+  @CarbonProperty
+  public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
+  = "carbon.load.skewedDataOptimization.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6dd8b038/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
index 907708c..6624311 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -92,6 +94,20 @@ public class TableBlockInfo implements Distributable, 
Serializable {
 
   private String dataMapWriterPath;
 
+  /**
+   * comparator to sort by block size in descending order.
+   * Since each line is not exactly the same, the size of a InputSplit may 
differs,
+   * so we allow some deviation for these splits.
+   */
+  public static final Comparator DATA_SIZE_DESC_COMPARATOR =
+  new Comparator() {
+@Override public int compare(Distributable o1, Distributable o2) {
+  long diff =
+  ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) 
o2).getBlockLength();
+  return diff < 0 ? 1 : (diff == 0 ? 0 : -1);
+}
+  };
+
   public TableBlockInfo(String filePath, long blockOffset, String segmentId,
   String[] locations, long blockLength, ColumnarFormatVersion version,