[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading
[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading Enhance data loading performance by specifying sort column bounds 1. Add row range number during convert-process-step 2. Dispatch rows to each sorter by range number 3. Sort/Write process step can be done concurrently in each range 4. Since all sorttemp files will be written in one folders, we add range number to the file name to distingush them Tests added and docs updated This closes #1953 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/faad967d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/faad967d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/faad967d Branch: refs/heads/carbonstore Commit: faad967d8d83eabd3e758b081370235e42a3ecee Parents: 623a1f9 Author: xuchuanyinAuthored: Tue Feb 13 10:58:06 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 20:32:14 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/row/CarbonRow.java | 10 +- .../ThriftWrapperSchemaConverterImpl.java | 2 +- .../core/metadata/schema/BucketingInfo.java | 24 +- .../core/metadata/schema/ColumnRangeInfo.java | 29 ++ .../metadata/schema/SortColumnRangeInfo.java| 83 + docs/data-management-on-carbondata.md | 11 + .../TestLoadDataWithSortColumnBounds.scala | 348 +++ .../carbondata/spark/rdd/CarbonScanRDD.scala| 2 +- .../carbondata/spark/rdd/PartitionDropper.scala | 2 +- .../spark/rdd/PartitionSplitter.scala | 2 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 3 +- .../strategy/CarbonLateDecodeStrategy.scala | 2 +- .../loading/CarbonDataLoadConfiguration.java| 11 + .../loading/DataLoadProcessBuilder.java | 77 +++- .../loading/converter/RowConverter.java | 2 +- .../converter/impl/RowConverterImpl.java| 5 + .../loading/model/CarbonLoadModel.java | 14 + .../loading/model/CarbonLoadModelBuilder.java | 1 + .../processing/loading/model/LoadOption.java| 1 + .../partition/impl/HashPartitionerImpl.java | 10 +- .../partition/impl/RangePartitionerImpl.java| 71 .../partition/impl/RawRowComparator.java| 63 .../processing/loading/sort/SorterFactory.java | 16 +- ...arallelReadMergeSorterWithBucketingImpl.java | 272 --- ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++ ...arallelReadMergeSorterWithBucketingImpl.java | 263 -- ...allelReadMergeSorterWithColumnRangeImpl.java | 293 .../loading/sort/unsafe/UnsafeSortDataRows.java | 6 +- .../unsafe/merger/UnsafeIntermediateMerger.java | 6 +- .../UnsafeSingleThreadFinalSortFilesMerger.java | 11 +- .../steps/DataConverterProcessorStepImpl.java | 102 +- ...ConverterProcessorWithBucketingStepImpl.java | 161 - .../steps/DataWriterProcessorStepImpl.java | 70 +++- .../SingleThreadFinalSortFilesMerger.java | 3 +- .../processing/sort/sortdata/SortDataRows.java | 11 +- .../sortdata/SortIntermediateFileMerger.java| 6 +- .../sort/sortdata/SortParameters.java | 10 + .../store/CarbonFactDataHandlerColumnar.java| 6 +- 39 files changed, 1558 insertions(+), 750 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index a6bf60f..8ff8dc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants { public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION = "carbon.load.skewedDataOptimization.enabled"; public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; + + /** + * field delimiter for each field in one bound + */ + public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ","; + + /** + * row delimiter for each sort column bounds + */ + public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java -- diff --git
[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading
[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading Enhance data loading performance by specifying sort column bounds 1. Add row range number during convert-process-step 2. Dispatch rows to each sorter by range number 3. Sort/Write process step can be done concurrently in each range 4. Since all sorttemp files will be written in one folders, we add range number to the file name to distingush them Tests added and docs updated This closes #1953 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/faad967d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/faad967d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/faad967d Branch: refs/heads/carbonstore-rebase5 Commit: faad967d8d83eabd3e758b081370235e42a3ecee Parents: 623a1f9 Author: xuchuanyinAuthored: Tue Feb 13 10:58:06 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 20:32:14 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/row/CarbonRow.java | 10 +- .../ThriftWrapperSchemaConverterImpl.java | 2 +- .../core/metadata/schema/BucketingInfo.java | 24 +- .../core/metadata/schema/ColumnRangeInfo.java | 29 ++ .../metadata/schema/SortColumnRangeInfo.java| 83 + docs/data-management-on-carbondata.md | 11 + .../TestLoadDataWithSortColumnBounds.scala | 348 +++ .../carbondata/spark/rdd/CarbonScanRDD.scala| 2 +- .../carbondata/spark/rdd/PartitionDropper.scala | 2 +- .../spark/rdd/PartitionSplitter.scala | 2 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 3 +- .../strategy/CarbonLateDecodeStrategy.scala | 2 +- .../loading/CarbonDataLoadConfiguration.java| 11 + .../loading/DataLoadProcessBuilder.java | 77 +++- .../loading/converter/RowConverter.java | 2 +- .../converter/impl/RowConverterImpl.java| 5 + .../loading/model/CarbonLoadModel.java | 14 + .../loading/model/CarbonLoadModelBuilder.java | 1 + .../processing/loading/model/LoadOption.java| 1 + .../partition/impl/HashPartitionerImpl.java | 10 +- .../partition/impl/RangePartitionerImpl.java| 71 .../partition/impl/RawRowComparator.java| 63 .../processing/loading/sort/SorterFactory.java | 16 +- ...arallelReadMergeSorterWithBucketingImpl.java | 272 --- ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++ ...arallelReadMergeSorterWithBucketingImpl.java | 263 -- ...allelReadMergeSorterWithColumnRangeImpl.java | 293 .../loading/sort/unsafe/UnsafeSortDataRows.java | 6 +- .../unsafe/merger/UnsafeIntermediateMerger.java | 6 +- .../UnsafeSingleThreadFinalSortFilesMerger.java | 11 +- .../steps/DataConverterProcessorStepImpl.java | 102 +- ...ConverterProcessorWithBucketingStepImpl.java | 161 - .../steps/DataWriterProcessorStepImpl.java | 70 +++- .../SingleThreadFinalSortFilesMerger.java | 3 +- .../processing/sort/sortdata/SortDataRows.java | 11 +- .../sortdata/SortIntermediateFileMerger.java| 6 +- .../sort/sortdata/SortParameters.java | 10 + .../store/CarbonFactDataHandlerColumnar.java| 6 +- 39 files changed, 1558 insertions(+), 750 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index a6bf60f..8ff8dc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants { public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION = "carbon.load.skewedDataOptimization.enabled"; public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; + + /** + * field delimiter for each field in one bound + */ + public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ","; + + /** + * row delimiter for each sort column bounds + */ + public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java -- diff
[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading
[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading Enhance data loading performance by specifying sort column bounds 1. Add row range number during convert-process-step 2. Dispatch rows to each sorter by range number 3. Sort/Write process step can be done concurrently in each range 4. Since all sorttemp files will be written in one folders, we add range number to the file name to distingush them Tests added and docs updated This closes #1953 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/49d06c20 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/49d06c20 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/49d06c20 Branch: refs/heads/carbonstore-rebase5 Commit: 49d06c205a06817139fe6b0bb99240013d33a92d Parents: 8996cd4 Author: xuchuanyinAuthored: Tue Feb 13 10:58:06 2018 +0800 Committer: Jacky Li Committed: Sun Mar 4 20:08:30 2018 +0800 -- .../constants/CarbonLoadOptionConstants.java| 10 + .../core/datastore/row/CarbonRow.java | 10 +- .../ThriftWrapperSchemaConverterImpl.java | 2 +- .../core/metadata/schema/BucketingInfo.java | 24 +- .../core/metadata/schema/ColumnRangeInfo.java | 29 ++ .../metadata/schema/SortColumnRangeInfo.java| 83 + docs/data-management-on-carbondata.md | 11 + .../TestLoadDataWithSortColumnBounds.scala | 348 +++ .../carbondata/spark/rdd/CarbonScanRDD.scala| 2 +- .../carbondata/spark/rdd/PartitionDropper.scala | 2 +- .../spark/rdd/PartitionSplitter.scala | 2 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 3 +- .../strategy/CarbonLateDecodeStrategy.scala | 2 +- .../loading/CarbonDataLoadConfiguration.java| 11 + .../loading/DataLoadProcessBuilder.java | 77 +++- .../loading/converter/RowConverter.java | 2 +- .../converter/impl/RowConverterImpl.java| 5 + .../loading/model/CarbonLoadModel.java | 14 + .../loading/model/CarbonLoadModelBuilder.java | 1 + .../processing/loading/model/LoadOption.java| 1 + .../partition/impl/HashPartitionerImpl.java | 10 +- .../partition/impl/RangePartitionerImpl.java| 71 .../partition/impl/RawRowComparator.java| 63 .../processing/loading/sort/SorterFactory.java | 16 +- ...arallelReadMergeSorterWithBucketingImpl.java | 272 --- ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++ ...arallelReadMergeSorterWithBucketingImpl.java | 263 -- ...allelReadMergeSorterWithColumnRangeImpl.java | 293 .../loading/sort/unsafe/UnsafeSortDataRows.java | 6 +- .../unsafe/merger/UnsafeIntermediateMerger.java | 6 +- .../UnsafeSingleThreadFinalSortFilesMerger.java | 11 +- .../steps/DataConverterProcessorStepImpl.java | 102 +- ...ConverterProcessorWithBucketingStepImpl.java | 161 - .../steps/DataWriterProcessorStepImpl.java | 70 +++- .../SingleThreadFinalSortFilesMerger.java | 3 +- .../processing/sort/sortdata/SortDataRows.java | 11 +- .../sortdata/SortIntermediateFileMerger.java| 6 +- .../sort/sortdata/SortParameters.java | 10 + .../store/CarbonFactDataHandlerColumnar.java| 6 +- 39 files changed, 1558 insertions(+), 750 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java -- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java index a6bf60f..8ff8dc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java @@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants { public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION = "carbon.load.skewedDataOptimization.enabled"; public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false"; + + /** + * field delimiter for each field in one bound + */ + public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ","; + + /** + * row delimiter for each sort column bounds + */ + public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java -- diff
[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java -- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java deleted file mode 100644 index f605b22..000 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.loading.sort.impl; - -import java.io.File; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.schema.BucketingInfo; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; -import org.apache.carbondata.processing.loading.DataField; -import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; -import org.apache.carbondata.processing.loading.row.CarbonRowBatch; -import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter; -import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage; -import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows; -import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger; -import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger; -import org.apache.carbondata.processing.sort.sortdata.SortParameters; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -/** - * It parallely reads data from array of iterates and do merge sort. - * First it sorts the data and write to temp files. These temp files will be merge sorted to get - * final merge sort result. - * This step is specifically for bucketing, it sorts each bucket data separately and write to - * temp files. - */ -public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter { - - private static final LogService LOGGER = - LogServiceFactory.getLogService( - UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName()); - - private SortParameters sortParameters; - - private BucketingInfo bucketingInfo; - - public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields, - BucketingInfo bucketingInfo) { -this.bucketingInfo = bucketingInfo; - } - - @Override public void initialize(SortParameters sortParameters) { -this.sortParameters = sortParameters; - } - - @Override public Iterator[] sort(Iterator[] iterators) - throws CarbonDataLoadingException { -UnsafeSortDataRows[] sortDataRows = new UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()]; -UnsafeIntermediateMerger[] intermediateFileMergers = -new UnsafeIntermediateMerger[sortDataRows.length]; -int inMemoryChunkSizeInMB = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB(); -inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / bucketingInfo.getNumberOfBuckets(); -if (inMemoryChunkSizeInMB < 5) { - inMemoryChunkSizeInMB = 5; -} -try { - for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) { -SortParameters parameters = sortParameters.getCopy(); -parameters.setPartitionID(i + ""); -setTempLocation(parameters); -