[14/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9a423c24 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9a423c24 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9a423c24 Branch: refs/heads/master Commit: 9a423c241b01d6e0b3f1946c052230adb47f4538 Parents: 2b41f14 Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: ravipesala Committed: Thu Mar 8 22:21:10 2018 +0530 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 480 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 10 files changed, 545 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a423c24/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9a423c24/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) { @@ -434,4 +450,17 @@ public class TableBlockInfo
[21/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java -- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java index 7ea5cb3..e5583c2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java @@ -19,20 +19,35 @@ package org.apache.carbondata.processing.loading.sort.unsafe; import java.io.DataOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; +import java.math.BigDecimal; +import java.util.Arrays; +import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.IntPointerBuffer; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; -import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow; -import org.apache.carbondata.processing.loading.sort.SortStepRowHandler; -import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.DataTypeUtil; /** * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed */ public class UnsafeCarbonRowPage { + + private boolean[] noDictionaryDimensionMapping; + + private boolean[] noDictionarySortColumnMapping; + + private int dimensionSize; + + private int measureSize; + + private DataType[] measureDataType; + + private long[] nullSetWords; + private IntPointerBuffer buffer; private int lastSize; @@ -47,14 +62,16 @@ public class UnsafeCarbonRowPage { private long taskId; - private TableFieldStat tableFieldStat; - private SortStepRowHandler sortStepRowHandler; - - public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock, - boolean saveToDisk, long taskId) { -this.tableFieldStat = tableFieldStat; -this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat); + public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping, + boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type, + MemoryBlock memoryBlock, boolean saveToDisk, long taskId) { +this.noDictionaryDimensionMapping = noDictionaryDimensionMapping; +this.noDictionarySortColumnMapping = noDictionarySortColumnMapping; +this.dimensionSize = dimensionSize; +this.measureSize = measureSize; +this.measureDataType = type; this.saveToDisk = saveToDisk; +this.nullSetWords = new long[((measureSize - 1) >> 6) + 1]; this.taskId = taskId; buffer = new IntPointerBuffer(this.taskId); this.dataBlock = memoryBlock; @@ -63,44 +80,255 @@ public class UnsafeCarbonRowPage { this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER; } - public int addRow(Object[] row, ByteBuffer rowBuffer) { -int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer); + public int addRow(Object[] row) { +int size = addRow(row, dataBlock.getBaseOffset() + lastSize); buffer.set(lastSize); lastSize = lastSize + size; return size; } - /** - * add raw row as intermidiate sort temp row to page - * - * @param row - * @param address - * @return - */ - private int addRow(Object[] row, long address, ByteBuffer rowBuffer) { -return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row, -dataBlock.getBaseObject(), address, rowBuffer); + private int addRow(Object[] row, long address) { +if (row == null) { + throw new RuntimeException("Row is null ??"); +} +int dimCount = 0; +int size = 0; +Object baseObject = dataBlock.getBaseObject(); +for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) { + if (noDictionaryDimensionMapping[dimCount]) { +byte[] col = (byte[]) row[dimCount]; +CarbonUnsafe.getUnsafe() +.putShort(baseObject, address + size, (short) col.length); +size += 2; +CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, +address + size, col.length); +size += col.length; + } else { +int value = (int) row[dimCount]; +CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value); +size += 4; + } +} + +// write complex dimensions here. +for (; dimCount < dimensionSize; dimCount++) { +
[22/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8d8b589e Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8d8b589e Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8d8b589e Branch: refs/heads/master Commit: 8d8b589e78a9db1ddc101d20c1e3feb500acce19 Parents: 21704cf Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: ravipesala Committed: Thu Mar 8 22:21:10 2018 +0530 -- .../core/datamap/dev/AbstractDataMapWriter.java | 5 +- .../core/datamap/dev/DataMapFactory.java| 2 +- .../blockletindex/BlockletDataMapFactory.java | 2 +- .../SegmentUpdateStatusManager.java | 9 +- .../carbondata/core/util/NonDictionaryUtil.java | 67 ++- .../datamap/examples/MinMaxDataMapFactory.java | 5 +- .../datamap/examples/MinMaxDataWriter.java | 7 +- .../presto/util/CarbonDataStoreCreator.scala| 1 + .../CarbonIndexFileMergeTestCase.scala | 4 - .../testsuite/datamap/CGDataMapTestCase.scala | 26 +- .../testsuite/datamap/DataMapWriterSuite.scala | 19 +- .../testsuite/datamap/FGDataMapTestCase.scala | 31 +- .../iud/DeleteCarbonTableTestCase.scala | 2 +- .../TestInsertAndOtherCommandConcurrent.scala | 14 +- .../StandardPartitionTableCleanTestCase.scala | 12 +- .../StandardPartitionTableLoadingTestCase.scala | 2 +- .../load/DataLoadProcessorStepOnSpark.scala | 6 +- .../carbondata/spark/util/DataLoadingUtil.scala | 2 +- .../datamap/DataMapWriterListener.java | 2 +- .../loading/row/IntermediateSortTempRow.java| 117 - .../loading/sort/SortStepRowHandler.java| 466 --- .../loading/sort/SortStepRowUtil.java | 103 .../sort/unsafe/UnsafeCarbonRowPage.java| 331 +++-- .../loading/sort/unsafe/UnsafeSortDataRows.java | 57 ++- .../unsafe/comparator/UnsafeRowComparator.java | 95 ++-- .../UnsafeRowComparatorForNormalDIms.java | 59 +++ .../UnsafeRowComparatorForNormalDims.java | 59 --- .../sort/unsafe/holder/SortTempChunkHolder.java | 3 +- .../holder/UnsafeFinalMergePageHolder.java | 19 +- .../unsafe/holder/UnsafeInmemoryHolder.java | 21 +- .../holder/UnsafeSortTempFileChunkHolder.java | 138 -- .../merger/UnsafeIntermediateFileMerger.java| 118 - .../UnsafeSingleThreadFinalSortFilesMerger.java | 27 +- .../processing/merger/CarbonDataMergerUtil.java | 8 +- .../merger/CompactionResultSortProcessor.java | 5 +- .../merger/RowResultMergerProcessor.java| 5 +- .../partition/spliter/RowResultProcessor.java | 5 +- .../sort/sortdata/IntermediateFileMerger.java | 95 +++- .../IntermediateSortTempRowComparator.java | 73 --- .../sort/sortdata/NewRowComparator.java | 5 +- .../sortdata/NewRowComparatorForNormalDims.java | 3 +- .../processing/sort/sortdata/RowComparator.java | 94 .../sortdata/RowComparatorForNormalDims.java| 62 +++ .../SingleThreadFinalSortFilesMerger.java | 25 +- .../processing/sort/sortdata/SortDataRows.java | 85 +++- .../sort/sortdata/SortTempFileChunkHolder.java | 174 +-- .../sort/sortdata/TableFieldStat.java | 176 --- .../util/CarbonDataProcessorUtil.java | 4 +- .../processing/util/CarbonLoaderUtil.java | 9 - 49 files changed, 1368 insertions(+), 1291 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java index bcc9bad..de6dcb1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev; import java.io.IOException; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import
[20/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java -- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java new file mode 100644 index 000..0ae0b93 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.sort.sortdata; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer; +import org.apache.carbondata.core.util.NonDictionaryUtil; + +public class RowComparator implements Comparator
[34/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fe8ab4c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fe8ab4c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fe8ab4c Branch: refs/heads/carbonstore Commit: 8fe8ab4c078de0ccd218f8ba41352896aebd5202 Parents: 28b5720 Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 20:32:13 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 480 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 10 files changed, 545 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) { @@ -434,4 +450,17 @@ public class TableBlockInfo
[25/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fdd5d0f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fdd5d0f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fdd5d0f Branch: refs/heads/carbonstore Commit: 3fdd5d0f567e8d07cc502202ced7d490fa85e2ad Parents: 0bb4aed Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 20:32:12 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../CarbonIndexFileMergeTestCase.scala | 4 - .../StandardPartitionTableLoadingTestCase.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 12 files changed, 552 insertions(+), 188 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength,
[25/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3fdd5d0f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3fdd5d0f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3fdd5d0f Branch: refs/heads/carbonstore-rebase5 Commit: 3fdd5d0f567e8d07cc502202ced7d490fa85e2ad Parents: 0bb4aed Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 20:32:12 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../CarbonIndexFileMergeTestCase.scala | 4 - .../StandardPartitionTableLoadingTestCase.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 12 files changed, 552 insertions(+), 188 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3fdd5d0f/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength,
[34/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fe8ab4c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fe8ab4c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fe8ab4c Branch: refs/heads/carbonstore-rebase5 Commit: 8fe8ab4c078de0ccd218f8ba41352896aebd5202 Parents: 28b5720 Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 20:32:13 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 480 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 10 files changed, 545 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe8ab4c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) { @@ -434,4 +450,17 @@ public class TableBlockInfo
[32/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/111bb5c4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/111bb5c4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/111bb5c4 Branch: refs/heads/carbonstore-rebase5 Commit: 111bb5c41946d72604964658ec8562f7722dec14 Parents: ef81248 Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 19:54:03 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../CarbonIndexFileMergeTestCase.scala | 4 - .../StandardPartitionTableLoadingTestCase.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 12 files changed, 552 insertions(+), 188 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/111bb5c4/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength,
[27/50] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b5908d2c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b5908d2c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b5908d2c Branch: refs/heads/carbonstore-rebase5 Commit: b5908d2c100e66373670a40d1c785a0d932489ce Parents: 65daaca Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Fri Mar 2 15:52:35 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../sdv/generated/MergeIndexTestCase.scala | 11 +- .../CarbonIndexFileMergeTestCase.scala | 19 +- .../StandardPartitionTableLoadingTestCase.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 13 files changed, 563 insertions(+), 203 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b5908d2c/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/b5908d2c/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String
[25/49] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7070f54b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7070f54b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7070f54b Branch: refs/heads/carbonstore-rebase5 Commit: 7070f54b221c2b8120841fb283c2c69bde55ee8f Parents: c270ebb Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Wed Feb 28 22:05:21 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../sdv/generated/MergeIndexTestCase.scala | 11 +- .../CarbonIndexFileMergeTestCase.scala | 19 +- .../StandardPartitionTableLoadingTestCase.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 13 files changed, 563 insertions(+), 203 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7070f54b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7070f54b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String
[45/50] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bfc9fc66 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bfc9fc66 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bfc9fc66 Branch: refs/heads/carbonstore-rebase4 Commit: bfc9fc6697a09c6ee64df5db2c9df6a052a74528 Parents: d3289b8 Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Tue Feb 27 17:04:19 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 480 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 10 files changed, 545 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfc9fc66/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bfc9fc66/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) { @@ -434,4 +450,17 @@ public class TableBlockInfo
[26/50] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/25b09408 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/25b09408 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/25b09408 Branch: refs/heads/carbonstore-rebase4 Commit: 25b094081614b42ab9634e54c8b4446d1c8ef5c0 Parents: 3adb67d Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Tue Feb 27 16:59:48 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../sdv/generated/MergeIndexTestCase.scala | 11 +- .../CarbonIndexFileMergeTestCase.scala | 19 +- .../StandardPartitionTableLoadingTestCase.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 13 files changed, 563 insertions(+), 203 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/25b09408/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/25b09408/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String
[43/49] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d6d6f10b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d6d6f10b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d6d6f10b Branch: refs/heads/carbonstore-rebase4 Commit: d6d6f10bc7e4e8ee5e8d1a9ee2b723ada37564a3 Parents: 93b2efd Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Tue Feb 27 09:20:37 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 480 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 10 files changed, 545 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6d6f10b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6d6f10b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version, String[] deletedDeltaFilePath) { @@ -434,4 +450,17 @@ public class TableBlockInfo
carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
Repository: carbondata Updated Branches: refs/heads/carbonstore 937bdb867 -> fd450b151 [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fd450b15 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fd450b15 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fd450b15 Branch: refs/heads/carbonstore Commit: fd450b151cb5858504116f560da5cd7a357894e5 Parents: 937bdb8 Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Mon Feb 12 19:22:46 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 480 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 10 files changed, 545 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd450b15/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index 907708c..6624311 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -92,6 +94,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version,
carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading [Forced Update!]
Repository: carbondata Updated Branches: refs/heads/carbonstore-rebase 5c55dfe19 -> 132fbd4d8 (forced update) [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/132fbd4d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/132fbd4d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/132fbd4d Branch: refs/heads/carbonstore-rebase Commit: 132fbd4d8d2a91efca542bd31ae5935171802b7e Parents: c82734e Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Fri Feb 9 21:45:48 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../sdv/generated/MergeIndexTestCase.scala | 11 +- .../CarbonIndexFileMergeTestCase.scala | 19 +- .../StandardPartitionTableLoadingTestCase.scala | 5 +- .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 13 files changed, 564 insertions(+), 205 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/132fbd4d/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index a7bfdba..c0cebe0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -98,6 +100,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 :
carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading
Repository: carbondata Updated Branches: refs/heads/carbonstore de92ea9a1 -> 6dd8b038f [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading Carbondata assign blocks to nodes at the beginning of data loading. Previous block allocation strategy is block number based and it will suffer skewed data problem if the size of input files differs a lot. We introduced a size based block allocation strategy to optimize data loading performance in skewed data scenario. This closes #1808 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6dd8b038 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6dd8b038 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6dd8b038 Branch: refs/heads/carbonstore Commit: 6dd8b038fc898dbf48ad30adfc870c19eb38e3d0 Parents: de92ea9 Author: xuchuanyinAuthored: Thu Feb 8 14:42:39 2018 +0800 Committer: Jacky Li Committed: Fri Feb 9 15:49:17 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/block/TableBlockInfo.java| 29 ++ .../carbondata/core/util/CarbonProperties.java | 11 + docs/useful-tips-on-carbondata.md | 1 + .../spark/rdd/NewCarbonDataLoadRDD.scala| 4 +- .../spark/sql/hive/DistributionUtil.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala| 18 +- .../merger/NodeMultiBlockRelation.java | 40 ++ .../processing/util/CarbonLoaderUtil.java | 494 --- .../processing/util/CarbonLoaderUtilTest.java | 125 + 10 files changed, 551 insertions(+), 183 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6dd8b038/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index bcfeba0..a6bf60f 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -114,4 +114,14 @@ public final class CarbonLoadOptionConstants { */ public static final int MAX_EXTERNAL_DICTIONARY_SIZE = 1000; + /** + * enable block size based block allocation while loading data. By default, carbondata assigns + * blocks to node based on block number. If this option is set to `true`, carbondata will + * consider block size first and make sure that all the nodes will process almost equal size of + * data. This option is especially useful when you encounter skewed data. + */ + @CarbonProperty + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION + = "carbon.load.skewedDataOptimization.enabled"; + public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6dd8b038/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java index 907708c..6624311 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/TableBlockInfo.java @@ -19,6 +19,8 @@ package org.apache.carbondata.core.datastore.block; import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.Map; @@ -92,6 +94,20 @@ public class TableBlockInfo implements Distributable, Serializable { private String dataMapWriterPath; + /** + * comparator to sort by block size in descending order. + * Since each line is not exactly the same, the size of a InputSplit may differs, + * so we allow some deviation for these splits. + */ + public static final Comparator DATA_SIZE_DESC_COMPARATOR = + new Comparator() { +@Override public int compare(Distributable o1, Distributable o2) { + long diff = + ((TableBlockInfo) o1).getBlockLength() - ((TableBlockInfo) o2).getBlockLength(); + return diff < 0 ? 1 : (diff == 0 ? 0 : -1); +} + }; + public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations, long blockLength, ColumnarFormatVersion version,