This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 5c28ee6 [CARBONDATA-3714] Support specify order type when list stage files 5c28ee6 is described below commit 5c28ee61433c4529d86a7611cf9768884f6c59bc Author: liuzhi <371684...@qq.com> AuthorDate: Tue Feb 18 16:46:35 2020 +0800 [CARBONDATA-3714] Support specify order type when list stage files Why is this PR needed? Sometimes, user want load the lastest data to table first. What changes were proposed in this PR? Add "batch_file_order" option for CarbonInsertFromStagesCommand. Does this PR introduce any user interface change? Yes. (One option "batch_file_order" is added for CarbonInsertFromStageCommand, document added) Is any new testcase added? Yes This closes #3628 --- docs/dml-of-carbondata.md | 14 +++++ .../org/apache/carbon/flink/TestCarbonWriter.scala | 15 ++--- .../management/CarbonInsertFromStageCommand.scala | 70 +++++++++++++++++----- 3 files changed, 78 insertions(+), 21 deletions(-) diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md index 9d935c8..49e5664 100644 --- a/docs/dml-of-carbondata.md +++ b/docs/dml-of-carbondata.md @@ -323,6 +323,7 @@ CarbonData DML statements are documented here,which includes: | Property | Description | | ------------------------------------------------------- | ------------------------------------------------------------ | | [BATCH_FILE_COUNT](#batch_file_count) | The number of stage files per processing | +| [BATCH_FILE_ORDER](#batch_file_order) | The order type of stage files in per processing | - You can use the following options to load data: @@ -334,11 +335,24 @@ CarbonData DML statements are documented here,which includes: OPTIONS('batch_file_count'='5') ``` + - ##### BATCH_FILE_ORDER: + The order type of stage files in per processing, choices: ASC, DESC. + The default is ASC. + Stage files will order by the last modified time with the specified order type. + + ``` + OPTIONS('batch_file_order'='DESC') + ``` + Examples: ``` INSERT INTO table1 STAGE INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5') + Note: This command use the default file order, will insert the earliest stage files into the table. + + INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5', 'batch_file_order'='DESC') + Note: This command will insert the latest stage files into the table. ``` ### Load Data Using Static Partition diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala index 1d82a75..396703d 100644 --- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.test.util.QueryTest + import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath @@ -37,7 +38,6 @@ class TestCarbonWriter extends QueryTest { val tableName = "test_flink" val bucketTableName = "insert_bucket_table" - test("Writing flink data to local carbon table") { sql(s"DROP TABLE IF EXISTS $tableName").collect() sql( @@ -281,9 +281,9 @@ class TestCarbonWriter extends QueryTest { val plan = sql( s""" - |select t1.*, t2.* - |from $tableName t1, $bucketTableName t2 - |where t1.stringField = t2.stringField + |select t1.*, t2.* + |from $tableName t1, $bucketTableName t2 + |where t1.stringField = t2.stringField """.stripMargin).queryExecution.executedPlan var shuffleExists = false plan.collect { @@ -297,9 +297,9 @@ class TestCarbonWriter extends QueryTest { checkAnswer(sql( s"""select count(*) from - |(select t1.*, t2.* - |from $tableName t1, $bucketTableName t2 - |where t1.stringField = t2.stringField) temp + |(select t1.*, t2.* + |from $tableName t1, $bucketTableName t2 + |where t1.stringField = t2.stringField) temp """.stripMargin), Row(1000)) } finally { sql(s"DROP TABLE IF EXISTS $tableName").collect() @@ -307,6 +307,7 @@ class TestCarbonWriter extends QueryTest { } } + private def newWriterProperties( dataTempPath: String, storeLocation: String) = { diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala index d63ec24..b971dcd 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala @@ -27,7 +27,6 @@ import scala.collection.JavaConverters._ import com.google.gson.Gson import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.InputSplit -import org.apache.log4j.Logger import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.spark.sql.util.SparkSQLUtil @@ -35,7 +34,7 @@ import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.filesystem.{AbstractDFSCarbonFile, CarbonFile} +import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileStore} @@ -61,12 +60,9 @@ case class CarbonInsertFromStageCommand( options: Map[String, String] ) extends DataCommand { - @transient var LOGGER: Logger = _ - - val DELETE_FILES_RETRY_TIMES = 3 + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) override def processData(spark: SparkSession): Seq[Row] = { - LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) Checker.validateTableExists(databaseNameOp, tableName, spark) val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark) val hadoopConf = spark.sessionState.newHadoopConf() @@ -118,15 +114,31 @@ case class CarbonInsertFromStageCommand( // 1) read all existing stage files val batchSize = try { - Integer.valueOf(options.getOrElse("batch_file_count", Integer.MAX_VALUE.toString)) + Integer.valueOf( + options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY, + CarbonInsertFromStageCommand.BATCH_FILE_COUNT_DEFAULT)) } catch { case _: NumberFormatException => - throw new MalformedCarbonCommandException("Option [batch_file_count] is not a number.") + throw new MalformedCarbonCommandException("Option [" + + CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is not a number.") } if (batchSize < 1) { - throw new MalformedCarbonCommandException("Option [batch_file_count] is less than 1.") + throw new MalformedCarbonCommandException("Option [" + + CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is less than 1.") + } + val orderType = options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY, + CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DEFAULT) + if (!orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC) && + !orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC)) { + throw new MalformedCarbonCommandException("Option [" + + CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY + "] is invalid, should be " + + CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC + " or " + + CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC + ".") } - val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize) + LOGGER.info("Option [" + CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY + + "] value is " + orderType) + val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize, + orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC)) if (stageFiles.isEmpty) { // no stage files, so do nothing LOGGER.warn("files not found under stage metadata folder") @@ -446,7 +458,7 @@ case class CarbonInsertFromStageCommand( executorService: ExecutorService, stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = { val startTime = System.currentTimeMillis() - var retry = DELETE_FILES_RETRY_TIMES + var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES while (deleteStageFiles(executorService, stageFiles).length > 0 && retry > 0) { retry -= 1 } @@ -484,7 +496,7 @@ case class CarbonInsertFromStageCommand( if (table.isHivePartitionTable) { return } - var retries = DELETE_FILES_RETRY_TIMES + var retries = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES while(deleteSnapShotFile(snapshotFilePath) && retries > 0) { retries -= 1 } @@ -497,7 +509,8 @@ case class CarbonInsertFromStageCommand( private def listStageFiles( loadDetailsDir: String, hadoopConf: Configuration, - batchSize: Int + batchSize: Int, + ascendingSort: Boolean ): Array[(CarbonFile, CarbonFile)] = { val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf) if (dir.exists()) { @@ -518,7 +531,12 @@ case class CarbonInsertFromStageCommand( }.filter { file => successFiles.contains(file.getName) }.sortWith { - (file1, file2) => file1.getLastModifiedTime < file2.getLastModifiedTime + (file1, file2) => + if (ascendingSort) { + file1.getLastModifiedTime < file2.getLastModifiedTime + } else { + file1.getLastModifiedTime > file2.getLastModifiedTime + } }.map { file => (file, successFiles(file.getName)) } @@ -556,3 +574,27 @@ case class CarbonInsertFromStageCommand( override protected def opName: String = "INSERT STAGE" } + +object CarbonInsertFromStageCommand { + + val DELETE_FILES_RETRY_TIMES = 3 + + val BATCH_FILE_COUNT_KEY = "batch_file_count" + + val BATCH_FILE_COUNT_DEFAULT: String = Integer.MAX_VALUE.toString + + val BATCH_FILE_ORDER_KEY = "batch_file_order" + + /** + * Use this option will insert the earliest stage files into the table. + */ + val BATCH_FILE_ORDER_ASC = "ASC" + + /** + * Use this option will insert the latest stage files into the table. + */ + val BATCH_FILE_ORDER_DESC = "DESC" + + val BATCH_FILE_ORDER_DEFAULT: String = BATCH_FILE_ORDER_KEY + +}