[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data 
loading

Enhance data loading performance by specifying sort column bounds
1. Add row range number during convert-process-step
2. Dispatch rows to each sorter by range number
3. Sort/Write process step can be done concurrently in each range
4. Since all sorttemp files will be written in one folders, we add range
number to the file name to distingush them

Tests added and docs updated

This closes #1953


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/faad967d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/faad967d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/faad967d

Branch: refs/heads/carbonstore
Commit: faad967d8d83eabd3e758b081370235e42a3ecee
Parents: 623a1f9
Author: xuchuanyin 
Authored: Tue Feb 13 10:58:06 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 20:32:14 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/row/CarbonRow.java   |  10 +-
 .../ThriftWrapperSchemaConverterImpl.java   |   2 +-
 .../core/metadata/schema/BucketingInfo.java |  24 +-
 .../core/metadata/schema/ColumnRangeInfo.java   |  29 ++
 .../metadata/schema/SortColumnRangeInfo.java|  83 +
 docs/data-management-on-carbondata.md   |  11 +
 .../TestLoadDataWithSortColumnBounds.scala  | 348 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala|   2 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../spark/rdd/PartitionSplitter.scala   |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../strategy/CarbonLateDecodeStrategy.scala |   2 +-
 .../loading/CarbonDataLoadConfiguration.java|  11 +
 .../loading/DataLoadProcessBuilder.java |  77 +++-
 .../loading/converter/RowConverter.java |   2 +-
 .../converter/impl/RowConverterImpl.java|   5 +
 .../loading/model/CarbonLoadModel.java  |  14 +
 .../loading/model/CarbonLoadModelBuilder.java   |   1 +
 .../processing/loading/model/LoadOption.java|   1 +
 .../partition/impl/HashPartitionerImpl.java |  10 +-
 .../partition/impl/RangePartitionerImpl.java|  71 
 .../partition/impl/RawRowComparator.java|  63 
 .../processing/loading/sort/SorterFactory.java  |  16 +-
 ...arallelReadMergeSorterWithBucketingImpl.java | 272 ---
 ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++
 ...arallelReadMergeSorterWithBucketingImpl.java | 263 --
 ...allelReadMergeSorterWithColumnRangeImpl.java | 293 
 .../loading/sort/unsafe/UnsafeSortDataRows.java |   6 +-
 .../unsafe/merger/UnsafeIntermediateMerger.java |   6 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  11 +-
 .../steps/DataConverterProcessorStepImpl.java   | 102 +-
 ...ConverterProcessorWithBucketingStepImpl.java | 161 -
 .../steps/DataWriterProcessorStepImpl.java  |  70 +++-
 .../SingleThreadFinalSortFilesMerger.java   |   3 +-
 .../processing/sort/sortdata/SortDataRows.java  |  11 +-
 .../sortdata/SortIntermediateFileMerger.java|   6 +-
 .../sort/sortdata/SortParameters.java   |  10 +
 .../store/CarbonFactDataHandlerColumnar.java|   6 +-
 39 files changed, 1558 insertions(+), 750 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a6bf60f..8ff8dc4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants {
   public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
   = "carbon.load.skewedDataOptimization.enabled";
   public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
+
+  /**
+   * field delimiter for each field in one bound
+   */
+  public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ",";
+
+  /**
+   * row delimiter for each sort column bounds
+   */
+  public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
--
diff --git 

[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data 
loading

Enhance data loading performance by specifying sort column bounds
1. Add row range number during convert-process-step
2. Dispatch rows to each sorter by range number
3. Sort/Write process step can be done concurrently in each range
4. Since all sorttemp files will be written in one folders, we add range
number to the file name to distingush them

Tests added and docs updated

This closes #1953


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/faad967d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/faad967d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/faad967d

Branch: refs/heads/carbonstore-rebase5
Commit: faad967d8d83eabd3e758b081370235e42a3ecee
Parents: 623a1f9
Author: xuchuanyin 
Authored: Tue Feb 13 10:58:06 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 20:32:14 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/row/CarbonRow.java   |  10 +-
 .../ThriftWrapperSchemaConverterImpl.java   |   2 +-
 .../core/metadata/schema/BucketingInfo.java |  24 +-
 .../core/metadata/schema/ColumnRangeInfo.java   |  29 ++
 .../metadata/schema/SortColumnRangeInfo.java|  83 +
 docs/data-management-on-carbondata.md   |  11 +
 .../TestLoadDataWithSortColumnBounds.scala  | 348 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala|   2 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../spark/rdd/PartitionSplitter.scala   |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../strategy/CarbonLateDecodeStrategy.scala |   2 +-
 .../loading/CarbonDataLoadConfiguration.java|  11 +
 .../loading/DataLoadProcessBuilder.java |  77 +++-
 .../loading/converter/RowConverter.java |   2 +-
 .../converter/impl/RowConverterImpl.java|   5 +
 .../loading/model/CarbonLoadModel.java  |  14 +
 .../loading/model/CarbonLoadModelBuilder.java   |   1 +
 .../processing/loading/model/LoadOption.java|   1 +
 .../partition/impl/HashPartitionerImpl.java |  10 +-
 .../partition/impl/RangePartitionerImpl.java|  71 
 .../partition/impl/RawRowComparator.java|  63 
 .../processing/loading/sort/SorterFactory.java  |  16 +-
 ...arallelReadMergeSorterWithBucketingImpl.java | 272 ---
 ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++
 ...arallelReadMergeSorterWithBucketingImpl.java | 263 --
 ...allelReadMergeSorterWithColumnRangeImpl.java | 293 
 .../loading/sort/unsafe/UnsafeSortDataRows.java |   6 +-
 .../unsafe/merger/UnsafeIntermediateMerger.java |   6 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  11 +-
 .../steps/DataConverterProcessorStepImpl.java   | 102 +-
 ...ConverterProcessorWithBucketingStepImpl.java | 161 -
 .../steps/DataWriterProcessorStepImpl.java  |  70 +++-
 .../SingleThreadFinalSortFilesMerger.java   |   3 +-
 .../processing/sort/sortdata/SortDataRows.java  |  11 +-
 .../sortdata/SortIntermediateFileMerger.java|   6 +-
 .../sort/sortdata/SortParameters.java   |  10 +
 .../store/CarbonFactDataHandlerColumnar.java|   6 +-
 39 files changed, 1558 insertions(+), 750 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a6bf60f..8ff8dc4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants {
   public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
   = "carbon.load.skewedDataOptimization.enabled";
   public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
+
+  /**
+   * field delimiter for each field in one bound
+   */
+  public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ",";
+
+  /**
+   * row delimiter for each sort column bounds
+   */
+  public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/faad967d/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
--
diff 

[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading

2018-03-04 Thread jackylk
[CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data 
loading

Enhance data loading performance by specifying sort column bounds
1. Add row range number during convert-process-step
2. Dispatch rows to each sorter by range number
3. Sort/Write process step can be done concurrently in each range
4. Since all sorttemp files will be written in one folders, we add range
number to the file name to distingush them

Tests added and docs updated

This closes #1953


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/49d06c20
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/49d06c20
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/49d06c20

Branch: refs/heads/carbonstore-rebase5
Commit: 49d06c205a06817139fe6b0bb99240013d33a92d
Parents: 8996cd4
Author: xuchuanyin 
Authored: Tue Feb 13 10:58:06 2018 +0800
Committer: Jacky Li 
Committed: Sun Mar 4 20:08:30 2018 +0800

--
 .../constants/CarbonLoadOptionConstants.java|  10 +
 .../core/datastore/row/CarbonRow.java   |  10 +-
 .../ThriftWrapperSchemaConverterImpl.java   |   2 +-
 .../core/metadata/schema/BucketingInfo.java |  24 +-
 .../core/metadata/schema/ColumnRangeInfo.java   |  29 ++
 .../metadata/schema/SortColumnRangeInfo.java|  83 +
 docs/data-management-on-carbondata.md   |  11 +
 .../TestLoadDataWithSortColumnBounds.scala  | 348 +++
 .../carbondata/spark/rdd/CarbonScanRDD.scala|   2 +-
 .../carbondata/spark/rdd/PartitionDropper.scala |   2 +-
 .../spark/rdd/PartitionSplitter.scala   |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../strategy/CarbonLateDecodeStrategy.scala |   2 +-
 .../loading/CarbonDataLoadConfiguration.java|  11 +
 .../loading/DataLoadProcessBuilder.java |  77 +++-
 .../loading/converter/RowConverter.java |   2 +-
 .../converter/impl/RowConverterImpl.java|   5 +
 .../loading/model/CarbonLoadModel.java  |  14 +
 .../loading/model/CarbonLoadModelBuilder.java   |   1 +
 .../processing/loading/model/LoadOption.java|   1 +
 .../partition/impl/HashPartitionerImpl.java |  10 +-
 .../partition/impl/RangePartitionerImpl.java|  71 
 .../partition/impl/RawRowComparator.java|  63 
 .../processing/loading/sort/SorterFactory.java  |  16 +-
 ...arallelReadMergeSorterWithBucketingImpl.java | 272 ---
 ...allelReadMergeSorterWithColumnRangeImpl.java | 289 +++
 ...arallelReadMergeSorterWithBucketingImpl.java | 263 --
 ...allelReadMergeSorterWithColumnRangeImpl.java | 293 
 .../loading/sort/unsafe/UnsafeSortDataRows.java |   6 +-
 .../unsafe/merger/UnsafeIntermediateMerger.java |   6 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  11 +-
 .../steps/DataConverterProcessorStepImpl.java   | 102 +-
 ...ConverterProcessorWithBucketingStepImpl.java | 161 -
 .../steps/DataWriterProcessorStepImpl.java  |  70 +++-
 .../SingleThreadFinalSortFilesMerger.java   |   3 +-
 .../processing/sort/sortdata/SortDataRows.java  |  11 +-
 .../sortdata/SortIntermediateFileMerger.java|   6 +-
 .../sort/sortdata/SortParameters.java   |  10 +
 .../store/CarbonFactDataHandlerColumnar.java|   6 +-
 39 files changed, 1558 insertions(+), 750 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
--
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index a6bf60f..8ff8dc4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -124,4 +124,14 @@ public final class CarbonLoadOptionConstants {
   public static final String ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION
   = "carbon.load.skewedDataOptimization.enabled";
   public static final String 
ENABLE_CARBON_LOAD_SKEWED_DATA_OPTIMIZATION_DEFAULT = "false";
+
+  /**
+   * field delimiter for each field in one bound
+   */
+  public static final String SORT_COLUMN_BOUNDS_FIELD_DELIMITER = ",";
+
+  /**
+   * row delimiter for each sort column bounds
+   */
+  public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/49d06c20/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
--
diff 

[46/50] [abbrv] carbondata git commit: [CARBONDATA-2091][DataLoad] Support specifying sort column bounds in data loading

2018-03-02 Thread jackylk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c96285/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
--
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index f605b22..000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.loading.DataField;
-import 
org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
-import org.apache.carbondata.processing.loading.sort.AbstractMergeSorter;
-import 
org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeSortDataRows;
-import 
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
-import 
org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be 
merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data 
separately and write to
- * temp files.
- */
-public class UnsafeParallelReadMergeSorterWithBucketingImpl extends 
AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-  LogServiceFactory.getLogService(
-
UnsafeParallelReadMergeSorterWithBucketingImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private BucketingInfo bucketingInfo;
-
-  public UnsafeParallelReadMergeSorterWithBucketingImpl(DataField[] 
inputDataFields,
-  BucketingInfo bucketingInfo) {
-this.bucketingInfo = bucketingInfo;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-this.sortParameters = sortParameters;
-  }
-
-  @Override public Iterator[] sort(Iterator[] 
iterators)
-  throws CarbonDataLoadingException {
-UnsafeSortDataRows[] sortDataRows = new 
UnsafeSortDataRows[bucketingInfo.getNumberOfBuckets()];
-UnsafeIntermediateMerger[] intermediateFileMergers =
-new UnsafeIntermediateMerger[sortDataRows.length];
-int inMemoryChunkSizeInMB = 
CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-inMemoryChunkSizeInMB = inMemoryChunkSizeInMB / 
bucketingInfo.getNumberOfBuckets();
-if (inMemoryChunkSizeInMB < 5) {
-  inMemoryChunkSizeInMB = 5;
-}
-try {
-  for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-SortParameters parameters = sortParameters.getCopy();
-parameters.setPartitionID(i + "");
-setTempLocation(parameters);
-