This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 45ccdf0 [CARBONDATA-3524] Support global_sort compaction 45ccdf0 is described below commit 45ccdf04fe82a9929623bae2e456ad2852041b60 Author: QiangCai <qiang...@qq.com> AuthorDate: Mon Sep 16 20:33:36 2019 +0800 [CARBONDATA-3524] Support global_sort compaction [Backgroud] For GLOBAL_SORT table, now the segments will be compact in LOCAL_SORT. [Motivation] After compaction, maybe it will impact query performance. Better to use GLABOL_SORT compaction to improve the performance. [Limitation] Range_Column still use old flow to load data only support standard table This closes #3389 --- .../CompactionSupportGlobalSortFunctionTest.scala | 19 ++- .../CompactionSupportGlobalSortParameterTest.scala | 2 +- .../dataload/TestGlobalSortDataLoad.scala | 92 ++++++++++++++ .../apache/spark/sql/util/SparkTypeConverter.scala | 2 +- .../spark/rdd/CarbonTableCompactor.scala | 138 ++++++++++++++++++++- .../org/apache/spark/util/AlterTableUtil.scala | 3 +- 6 files changed, 248 insertions(+), 8 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala index bad8bdc..ed0f466 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala @@ -42,7 +42,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf """ | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT) | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1') """.stripMargin) sql("DROP TABLE IF EXISTS carbon_localsort") @@ -68,6 +68,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") @@ -108,6 +109,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") @@ -138,6 +140,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") @@ -174,6 +177,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort") @@ -206,6 +210,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted") @@ -245,6 +250,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'major'") sql("clean files for table compaction_globalsort") @@ -279,7 +285,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") - + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'minor'") sql("clean files for table compaction_globalsort") @@ -312,6 +318,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'major'") sql("clean files for table compaction_globalsort") @@ -347,6 +354,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')") sql("ALTER TABLE compaction_globalsort COMPACT 'major'") sql("clean files for table compaction_globalsort") @@ -378,7 +386,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort") checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name") - + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')") sql("ALTER TABLE compaction_globalsort COMPACT 'major'") sql("clean files for table compaction_globalsort") checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted") @@ -438,6 +446,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort") sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort") } + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')") sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") assert(getIndexFileCount("compaction_globalsort", "0.1") === 2) @@ -453,6 +462,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort") sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort") } + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='2')") sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'") assert(getIndexFileCount("compaction_globalsort", "0.1") === 2) @@ -479,6 +489,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort") sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort") + sql("alter table compaction_globalsort set tblproperties('global_sort_partitions'='1')") sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'") sql("clean files for table compaction_globalsort") @@ -503,7 +514,7 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf """ | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT) | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT') + | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT', 'global_sort_partitions'='1') """.stripMargin) sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2") sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2") diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala index 4b027b9..a1d0fe0 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala @@ -43,7 +43,7 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA """ | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT) | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT') + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT', 'GLOBAL_SORT_PARTITIONS'='1') """.stripMargin) sql("DROP TABLE IF EXISTS carbon_localsort") diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 5eeee78..255f399 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -85,6 +85,9 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("DROP TABLE IF EXISTS carbon_globalsort2") sql("DROP TABLE IF EXISTS carbon_globalsort_partitioned") sql("DROP TABLE IF EXISTS carbon_globalsort_difftypes") + sql("DROP TABLE IF EXISTS carbon_globalsort_minor") + sql("DROP TABLE IF EXISTS carbon_globalsort_major") + sql("DROP TABLE IF EXISTS carbon_globalsort_custom") } // ----------------------------------- Compare Result ----------------------------------- @@ -183,6 +186,95 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("SELECT * FROM carbon_localsort_twice ORDER BY name, id")) } + test("Compaction GLOBAL_SORT: minor") { + sql("DROP TABLE IF EXISTS carbon_globalsort_minor") + sql( + """ + | CREATE TABLE carbon_globalsort_minor(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES( + | 'SORT_SCOPE'='GLOBAL_SORT', + | 'sort_columns' = 'name, city', + | 'AUTO_LOAD_MERGE'='false', + | 'COMPACTION_LEVEL_THRESHOLD'='3,0', 'GLOBAL_SORT_PARTITIONS'='3') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_minor") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + assertResult(4)(sql("show segments for table carbon_globalsort_minor").count()) + sql("ALTER TABLE carbon_globalsort_minor COMPACT 'MINOR'") + assertResult(5)(sql("show segments for table carbon_globalsort_minor").count()) + assertResult(3)( + sql("show segments for table carbon_globalsort_minor").rdd.filter(_.get(1).equals("Compacted")).count()) + assert(getIndexFileCount("carbon_globalsort_minor", "0.1") === 3) + checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_minor"), Seq(Row(48))) + checkAnswer(sql("SELECT * FROM carbon_globalsort_minor ORDER BY name, id"), + sql("SELECT * FROM carbon_globalsort ORDER BY name, id")) + } + + test("Compaction GLOBAL_SORT: major") { + sql("DROP TABLE IF EXISTS carbon_globalsort_major") + sql( + """ + | CREATE TABLE carbon_globalsort_major(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES( + | 'SORT_SCOPE'='GLOBAL_SORT', + | 'sort_columns' = 'name, city', + | 'AUTO_LOAD_MERGE'='false', + | 'MAJOR_COMPACTION_SIZE'='1024', 'GLOBAL_SORT_PARTITIONS'='4') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_major") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + assertResult(4)(sql("show segments for table carbon_globalsort_major").count()) + sql("ALTER TABLE carbon_globalsort_major COMPACT 'major'") + assertResult(5)(sql("show segments for table carbon_globalsort_major").count()) + assertResult(4)( + sql("show segments for table carbon_globalsort_major").rdd.filter(_.get(1).equals("Compacted")).count()) + assert(getIndexFileCount("carbon_globalsort_major", "0.1") === 4) + checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_major"), Seq(Row(48))) + checkAnswer(sql("SELECT * FROM carbon_globalsort_major ORDER BY name, id"), + sql("SELECT * FROM carbon_globalsort ORDER BY name, id")) + } + + test("Compaction GLOBAL_SORT: custom") { + sql("DROP TABLE IF EXISTS carbon_globalsort_custom") + sql( + """ + | CREATE TABLE carbon_globalsort_custom(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES( + | 'SORT_SCOPE'='GLOBAL_SORT', + | 'sort_columns' = 'name, city', + | 'AUTO_LOAD_MERGE'='false', 'GLOBAL_SORT_PARTITIONS'='3') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort_custom") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_globalsort") + assertResult(4)(sql("show segments for table carbon_globalsort_custom").count()) + sql("ALTER TABLE carbon_globalsort_custom COMPACT 'custom' WHERE SEGMENT.ID IN (0,1,2)") + assertResult(5)(sql("show segments for table carbon_globalsort_custom").count()) + assertResult(3)( + sql("show segments for table carbon_globalsort_custom").rdd.filter(_.get(1).equals("Compacted")).count()) + assert(getIndexFileCount("carbon_globalsort_custom", "0.1") === 3) + checkAnswer(sql("SELECT COUNT(*) FROM carbon_globalsort_custom"), Seq(Row(48))) + checkAnswer(sql("SELECT * FROM carbon_globalsort_custom ORDER BY name, id"), + sql("SELECT * FROM carbon_globalsort_custom ORDER BY name, id")) + } + // ----------------------------------- Check Configurations ----------------------------------- // Waiting for merge SET feature[CARBONDATA-1065] ignore("DDL > SET") { diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala index 2ea3d43..100ad17 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/SparkTypeConverter.scala @@ -29,7 +29,7 @@ import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataType import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension, ColumnSchema} -private[spark] object SparkTypeConverter { +object SparkTypeConverter { def createSparkSchema(table: CarbonTable, columns: Seq[String]): StructType = { Objects.requireNonNull(table) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 77b7119..bfbf52d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -24,21 +24,39 @@ import java.util.concurrent.ExecutorService import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.SQLContext +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{InputSplit, Job} +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} +import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} import org.apache.spark.util.MergeIndexUtil +import org.apache.spark.CarbonInputMetrics +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.constants.SortScopeOptions.SortScope import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.metadata.datatype.{StructField, StructType} import org.apache.carbondata.core.metadata.SegmentFileStore +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat, CarbonTableOutputFormat} +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.processing.loading.FailureCauses +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} +import org.apache.carbondata.processing.util.TableOptionConstant +import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark import org.apache.carbondata.spark.MergeResultImpl +import org.apache.carbondata.store.CarbonRowReadSupport /** * This class is used to perform compaction on carbon table. @@ -195,6 +213,11 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, carbonLoadModel, carbonMergerMapping ).collect + } else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope && + !carbonTable.getSortColumns.isEmpty && + carbonTable.getRangeColumn == null && + CarbonUtil.isStandardCarbonTable(carbonTable)) { + compactSegmentsByGlobalSort(sc.sparkSession, carbonLoadModel, carbonMergerMapping) } else { new CarbonMergerRDD( sc.sparkSession, @@ -325,4 +348,117 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, } } + /** + * compact segments by global sort + */ + def compactSegmentsByGlobalSort( + sparkSession: SparkSession, + carbonLoadModel: CarbonLoadModel, + carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = { + val dataFrame = dataFrameOfSegments( + sparkSession, + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + carbonMergerMapping.validSegments) + // generate LoadModel which can be used global_sort flow + val outputModel = getLoadModelForGlobalSort( + sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + carbonMergerMapping.validSegments) + outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1)) + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( + sparkSession, + Option(dataFrame), + outputModel, + SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) + .map { row => + (row._1, FailureCauses.NONE == row._2._2.failureCauses) + } + } + + /** + * create DataFrame basing on specified segments + */ + def dataFrameOfSegments( + sparkSession: SparkSession, + carbonTable: CarbonTable, + segments: Array[Segment] + ): DataFrame = { + val columns = carbonTable + .getCreateOrderColumn(carbonTable.getTableName) + .asScala + .map(_.getColName) + .toArray + val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns) + val rdd: RDD[Row] = new CarbonScanRDD[CarbonRow]( + sparkSession, + columnProjection = new CarbonProjection(columns), + null, + carbonTable.getAbsoluteTableIdentifier, + carbonTable.getTableInfo.serialize, + carbonTable.getTableInfo, + new CarbonInputMetrics, + null, + null, + classOf[CarbonRowReadSupport], + splitsOfSegments(sparkSession, carbonTable, segments)) + .map { row => + new GenericRow(row.getData.asInstanceOf[Array[Any]]) + } + sparkSession.createDataFrame(rdd, schema) + } + + /** + * get splits of specified segments + */ + def splitsOfSegments( + sparkSession: SparkSession, + carbonTable: CarbonTable, + segments: Array[Segment] + ): java.util.List[InputSplit] = { + val jobConf = new JobConf(SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) + SparkHadoopUtil.get.addCredentials(jobConf) + val job = Job.getInstance(jobConf) + val conf = job.getConfiguration + CarbonInputFormat.setTablePath(conf, carbonTable.getTablePath) + CarbonInputFormat.setTableInfo(conf, carbonTable.getTableInfo) + CarbonInputFormat.setDatabaseName(conf, carbonTable.getDatabaseName) + CarbonInputFormat.setTableName(conf, carbonTable.getTableName) + CarbonInputFormat.setQuerySegment(conf, segments.map(_.getSegmentNo).mkString(",")) + new CarbonTableInputFormat[Object].getSplits(job) + } + + /** + * create CarbonLoadModel for global_sort compaction + */ + def getLoadModelForGlobalSort( + sparkSession: SparkSession, + carbonTable: CarbonTable, + segments: Array[Segment] + ): CarbonLoadModel = { + val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf() + CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName) + CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName) + CarbonTableOutputFormat.setCarbonTable(conf, carbonTable) + val fieldList = carbonTable + .getCreateOrderColumn(carbonTable.getTableName) + .asScala + .map { column => + new StructField(column.getColName, column.getDataType) + } + CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava)) + val loadModel = CarbonTableOutputFormat.getLoadModel(conf) + loadModel.setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + ",\\N") + loadModel.setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + ",false") + loadModel.setBadRecordsAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName() + ",force") + loadModel.setIsEmptyDataBadRecord( + DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false") + val globalSortPartitions = + carbonTable.getTableInfo.getFactTable.getTableProperties.get("global_sort_partitions") + if (globalSortPartitions != null) { + loadModel.setGlobalSortPartitions(globalSortPartitions) + } + loadModel + } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 99bc863..5e7039f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -518,7 +518,8 @@ object AlterTableUtil { "LOAD_MIN_SIZE_INMB", "RANGE_COLUMN", "SORT_SCOPE", - "SORT_COLUMNS") + "SORT_COLUMNS", + "GLOBAL_SORT_PARTITIONS") supportedOptions.contains(propKey.toUpperCase) }