[carbondata] branch master updated: [CARBONDATA-3714] Support specify order type when list stage files

2020-03-05 Thread jackylk
This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
 new 5c28ee6  [CARBONDATA-3714] Support specify order type when list stage 
files
5c28ee6 is described below

commit 5c28ee61433c4529d86a7611cf9768884f6c59bc
Author: liuzhi <371684...@qq.com>
AuthorDate: Tue Feb 18 16:46:35 2020 +0800

[CARBONDATA-3714] Support specify order type when list stage files

Why is this PR needed?
Sometimes, user want load the lastest data to table first.

What changes were proposed in this PR?
Add "batch_file_order" option for CarbonInsertFromStagesCommand.

Does this PR introduce any user interface change?
Yes. (One option "batch_file_order" is added for 
CarbonInsertFromStageCommand, document added)

Is any new testcase added?
Yes

This closes #3628
---
 docs/dml-of-carbondata.md  | 14 +
 .../org/apache/carbon/flink/TestCarbonWriter.scala | 15 ++---
 .../management/CarbonInsertFromStageCommand.scala  | 70 +-
 3 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 9d935c8..49e5664 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -323,6 +323,7 @@ CarbonData DML statements are documented here,which 
includes:
 | Property| Description
  |
 | --- | 
 |
 | [BATCH_FILE_COUNT](#batch_file_count)   | The number of 
stage files per processing |
+| [BATCH_FILE_ORDER](#batch_file_order)   | The order type of 
stage files in per processing |
 
 -
   You can use the following options to load data:
@@ -334,11 +335,24 @@ CarbonData DML statements are documented here,which 
includes:
 OPTIONS('batch_file_count'='5')
 ```
 
+  - # BATCH_FILE_ORDER: 
+The order type of stage files in per processing, choices: ASC, DESC.
+The default is ASC.
+Stage files will order by the last modified time with the specified order 
type.
+
+``` 
+OPTIONS('batch_file_order'='DESC')
+```
+
   Examples:
   ```
   INSERT INTO table1 STAGE
 
   INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5')
+  Note: This command use the default file order, will insert the earliest 
stage files into the table.
+
+  INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5', 
'batch_file_order'='DESC')
+  Note: This command will insert the latest stage files into the table.
   ```
 
 ### Load Data Using Static Partition 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 1d82a75..396703d 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
+
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -37,7 +38,6 @@ class TestCarbonWriter extends QueryTest {
   val tableName = "test_flink"
   val bucketTableName = "insert_bucket_table"
 
-
   test("Writing flink data to local carbon table") {
 sql(s"DROP TABLE IF EXISTS $tableName").collect()
 sql(
@@ -281,9 +281,9 @@ class TestCarbonWriter extends QueryTest {
 
   val plan = sql(
 s"""
-  |select t1.*, t2.*
-  |from $tableName t1, $bucketTableName t2
-  |where t1.stringField = t2.stringField
+   |select t1.*, t2.*
+   |from $tableName t1, $bucketTableName t2
+   |where t1.stringField = t2.stringField
   """.stripMargin).queryExecution.executedPlan
   var shuffleExists = false
   plan.collect {
@@ -297,9 +297,9 @@ class TestCarbonWriter extends QueryTest {
 
   checkAnswer(sql(
 s"""select count(*) from
-  |(select t1.*, t2.*
-  |from $tableName t1, $bucketTableName t2
-  |where t1.stringField = t2.stringField) temp
+   |(select t1.*, t2.*
+   |from $tableName t1, $bucketTableName t2
+   |where t1.stringField = t2.stringField) temp
   """.stripMargin), Row(1000))
 } finally {
   sql(s"DROP TABLE IF EXISTS 

[carbondata] branch master updated: [CARBONDATA-3700] Optimize prune performance when prunning with multi-threads

2020-03-05 Thread liuzhi
This is an automated email from the ASF dual-hosted git repository.

liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
 new 3acc2de  [CARBONDATA-3700] Optimize prune performance when prunning 
with multi-threads
3acc2de is described below

commit 3acc2de351940c00564744ddf5da2a681a481a75
Author: h00424960 
AuthorDate: Fri Feb 14 23:25:59 2020 +0800

[CARBONDATA-3700] Optimize prune performance when prunning with 
multi-threads

Why is this PR needed?
1. When pruning with multi-threads, there is a bug hambers the prunning 
performance heavily.
When the pruning results in no blocklets to map the query filter, The 
getExtendblocklet function will be triggered to get the extend blocklet 
metadata, when the Input of this function is an empty blocklet list, this 
function is expected to return an empty extendblocklet list directyly , but now 
there is a bug leading to "a hashset add operation" overhead which is 
meaningless.
Meanwhile, When pruning with multi-threads, the getExtendblocklet function 
will be triggerd for each blocklet, which should be avoided by triggerring this 
function for each segment.
2. When pruning, there is a bug hambers the prunning performance heavily.
ValidatePartitionInfo operation is executed by every blocklet, and it 
iterates all the partitions info for each blocklet. sIf there are millions 
blocklets, and hundreds partitions, the compatutaion complexity will be 
hundreds millions.
3. In the prunning, It will create filterexecuter pre blocklet, which 
involves a huge performance degrade when there are serveral millions blocklet.
Specically, The creating of filterexecuter is a heavy operation which 
involves a lot of time cost init works.

What changes were proposed in this PR?
1.1 if the input is an empty blocklet list in the getExtendblocklet 
function, we return an empty extendblocklet list directyly
1.2 We trigger the getExtendblocklet functon for each segment instead of 
each blocklet.
2.1 Remove validatePartitionInfo. Add the validatePartiionInfo in the 
getDataMap processing
3.1 We create filterexecuter per segment instead of that per blocklet, and 
share the filterexecuter between all blocklets.
In the case, add column or change sort column, then update the segment, 
there will be serveral different columnschemas of blocklets which exist in the 
segment, only if the columnshemas of all the blocklets are same, the 
filterexecuter can be shared. So we add a fingerprinter for each blocklet, to 
identify the columnschema. If the fingerprinters are same, which means that the 
columnschema are equal with each other, so the filterexecuter can be reused

Does this PR introduce any user interface change?
No.

Is any new testcase added?
Yes.

This closes #3620
---
 .../core/constants/CarbonCommonConstants.java  |  10 +-
 .../carbondata/core/datamap/TableDataMap.java  |  60 +--
 .../carbondata/core/datamap/dev/DataMap.java   |   5 +-
 .../datamap/dev/cgdatamap/CoarseGrainDataMap.java  |   3 +-
 .../datamap/dev/fgdatamap/FineGrainDataMap.java|   3 +-
 .../core/datastore/block/SegmentProperties.java| 112 -
 .../indexstore/blockletindex/BlockDataMap.java |  29 +++---
 .../blockletindex/BlockletDataMapFactory.java  |   9 +-
 .../carbondata/core/util/CarbonProperties.java |  31 ++
 .../datastore/block/SegmentPropertiesTest.java |  36 ++-
 docs/configuration-parameters.md   |   1 +
 .../datamap/bloom/BloomCoarseGrainDataMap.java |   3 +-
 .../datamap/lucene/LuceneFineGrainDataMap.java |   4 +-
 .../blockprune/BlockPruneQueryTestCase.scala   |  89 ++--
 .../testsuite/datamap/CGDataMapTestCase.scala  |   6 +-
 .../testsuite/datamap/FGDataMapTestCase.scala  |   6 +-
 16 files changed, 353 insertions(+), 54 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 333c7b6..c020fc2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1499,8 +1499,14 @@ public final class CarbonCommonConstants {
 
   public static final String 
CARBON_MAX_DRIVER_THREADS_FOR_BLOCK_PRUNING_DEFAULT = "4";
 
-  // block prune in multi-thread if files size more than 100K files.
-  public static final int 
CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT = 10;
+  // block prune in multi-thread if files count more than specify threshold.
+  @CarbonProperty
+  public static final String 
CARBON_DRIVER_PRUNING_MULTI_THREAD_ENABLE_FILES_COUNT =
+  

[carbondata] branch master updated: [CARBONDATA-3731] Avoid data copy in Writer

2020-03-05 Thread ajantha
This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
 new 8808e9c  [CARBONDATA-3731] Avoid data copy in Writer
8808e9c is described below

commit 8808e9c65b404007d0b553e6f722a5a83aef4b8d
Author: Jacky Li 
AuthorDate: Sat Feb 29 14:38:38 2020 +0800

[CARBONDATA-3731] Avoid data copy in Writer

Why is this PR needed?
For variable length column like String and Binary, currently there are 5 
data copies during data write process, in 
CarbonFactDataHandlerColumnar.processDataRows

What changes were proposed in this PR?
This PR avoids unnecessary copies:

reduce from 5 copy to 1 copy by using DirectByteBuffer: copy data into 
column page backed by a ByteBuffer (DirectBuffer)
use Snappy to compress DirectBuffer directly and output compressed data in 
another ByteBuffer (DirectBuffer)

Does this PR introduce any user interface change?
No

Is any new testcase added?
Yes

This closes #3638
---
 .../columnar/DummyBlockIndexerStorage.java |  60 +++
 .../datastore/compression/AbstractCompressor.java  |  36 +-
 .../core/datastore/compression/Compressor.java |  15 +-
 .../core/datastore/compression/GzipCompressor.java |  67 +++-
 .../datastore/compression/SnappyCompressor.java|  71 +---
 .../core/datastore/compression/ZstdCompressor.java |  16 +-
 .../carbondata/core/datastore/page/ColumnPage.java |  61 +--
 .../core/datastore/page/DecimalColumnPage.java |   2 +-
 .../datastore/page/LVByteBufferColumnPage.java | 420 +
 .../core/datastore/page/LocalDictColumnPage.java   |  28 +-
 .../datastore/page/UnsafeFixLengthColumnPage.java  |  12 +
 .../datastore/page/UnsafeVarLengthColumnPage.java  |   2 +-
 .../page/UnsafeVarLengthColumnPageBase.java|  60 +++
 .../datastore/page/VarLengthColumnPageBase.java|  36 +-
 .../datastore/page/encoding/ColumnPageEncoder.java |   8 +-
 .../page/encoding/DefaultEncodingFactory.java  |   4 +-
 .../datastore/page/encoding/EncodedColumnPage.java |   8 +-
 .../page/encoding/adaptive/AdaptiveCodec.java  |  13 +-
 .../adaptive/AdaptiveDeltaFloatingCodec.java   |   9 +-
 .../adaptive/AdaptiveDeltaIntegralCodec.java   |   9 +-
 .../encoding/adaptive/AdaptiveFloatingCodec.java   |   9 +-
 .../encoding/adaptive/AdaptiveIntegralCodec.java   |   9 +-
 .../encoding/compress/DirectCompressCodec.java |   3 +-
 .../legacy/ComplexDimensionIndexCodec.java |   3 +-
 .../dimension/legacy/DictDimensionIndexCodec.java  |  77 
 .../dimension/legacy/IndexStorageEncoder.java  |  37 +-
 ...dexCodec.java => PlainDimensionIndexCodec.java} |  29 +-
 .../core/datastore/page/encoding/rle/RLECodec.java |   5 +-
 .../statistics/LVLongStringStatsCollector.java |  51 ---
 .../statistics/LVShortStringStatsCollector.java|  50 ---
 ...atsCollector.java => StringStatsCollector.java} |  22 +-
 .../core/indexstore/ExtendedBlockletWrapper.java   |   3 +-
 .../carbondata/core/util/BlockletDataMapUtil.java  |   6 +-
 .../hadoop/ft/CarbonTableInputFormatTest.java  |   1 +
 .../dataload/TestLoadDataWithCompression.scala |  16 +-
 .../testsuite/datamap/CGDataMapTestCase.scala  |   4 +-
 .../testsuite/datamap/FGDataMapTestCase.scala  |  10 +-
 .../sortexpr/AllDataTypesTestCaseSort.scala|  57 +++
 .../carbondata/processing/store/TablePage.java |  24 +-
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |   6 +-
 40 files changed, 908 insertions(+), 451 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/DummyBlockIndexerStorage.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/DummyBlockIndexerStorage.java
new file mode 100644
index 000..4049a41
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/DummyBlockIndexerStorage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+/**
+ * This is for all dimension except 

[carbondata] branch master updated: [CARBONDATA-3735] Avoid listing all tables in metastore

2020-03-05 Thread liuzhi
This is an automated email from the ASF dual-hosted git repository.

liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
 new ed3610c  [CARBONDATA-3735] Avoid listing all tables in metastore
ed3610c is described below

commit ed3610c070f6d174e5e6e84b6f9304c2684f
Author: Jacky Li 
AuthorDate: Wed Mar 4 16:30:44 2020 +0800

[CARBONDATA-3735] Avoid listing all tables in metastore

Why is this PR needed?
In CarbonCreateDataSourceTableCommand.scala, 
RegisterIndexTableCommand.scala, carbon is trying to list all tables in a 
database.
It will be slow if there are many tables in the db, thus should be avoided.

What changes were proposed in this PR?
This PR uses catalog tableExists API instead of listing all tables

Does this PR introduce any user interface change?
No

Is any new testcase added?
No

This closes #3655
---
 .../command/table/CarbonCreateDataSourceTableCommand.scala   | 12 +---
 .../execution/command/table/CarbonShowTablesCommand.scala|  2 +-
 .../secondaryindex/command/RegisterIndexTableCommand.scala   |  7 ---
 3 files changed, 6 insertions(+), 15 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
index 0274bd4..c94835f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateDataSourceTableCommand.scala
@@ -40,17 +40,7 @@ case class CarbonCreateDataSourceTableCommand(
 assert(table.tableType != CatalogTableType.VIEW)
 assert(table.provider.isDefined)
 val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-val sessionState = sparkSession.sessionState
-val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-val existingTables = sessionState.catalog.listTables(db)
-var tableExist = false
-existingTables.foreach { tid =>
-  if (tid.table.equalsIgnoreCase(table.identifier.table)
-  && tid.database.getOrElse("").equalsIgnoreCase(db)) {
-tableExist = true
-  }
-}
-if (tableExist) {
+if (sparkSession.sessionState.catalog.tableExists(table.identifier)) {
   if (ignoreIfExists) {
 return Seq.empty[Row]
   } else {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
index d50b766..c14264d 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
@@ -39,7 +39,7 @@ private[sql] case class CarbonShowTablesCommand ( 
databaseName: Option[String],
 // instead of calling tables in sparkSession.
 val catalog = sparkSession.sessionState.catalog
 val db = databaseName.getOrElse(catalog.getCurrentDatabase)
-var tables =
+val tables =
   tableIdentifierPattern.map(catalog.listTables(db, 
_)).getOrElse(catalog.listTables(db))
 val externalCatalog = sparkSession.sharedState.externalCatalog
 // this method checks whether the table is mainTable or datamap based on 
property "isVisible"
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala
index 1caaa34..3a8e595 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala
@@ -55,15 +55,16 @@ case class RegisterIndexTableCommand(dbName: 
Option[String], indexTableName: Str
 setAuditTable(databaseName, indexTableName)
 setAuditInfo(Map("Parent TableName" -> parentTable))
 // 1. check if the main and index table exist
-val tables: Seq[TableIdentifier] = 
sparkSession.sessionState.catalog.listTables(databaseName)
-if (!tables.exists(_.table.equalsIgnoreCase(parentTable))) {
+if (!sparkSession.sessionState.catalog.tableExists(
+  TableIdentifier(parentTable, Some(databaseName {
   val message: String = s"Secondary Index Table registration for table 
[$indexTableName] with" +
 s" table" +
 s" [$databaseName.$parentTable] failed." +
 s"Table