This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c28ee6  [CARBONDATA-3714] Support specify order type when list stage 
files
5c28ee6 is described below

commit 5c28ee61433c4529d86a7611cf9768884f6c59bc
Author: liuzhi <371684...@qq.com>
AuthorDate: Tue Feb 18 16:46:35 2020 +0800

    [CARBONDATA-3714] Support specify order type when list stage files
    
    Why is this PR needed?
    Sometimes, user want load the lastest data to table first.
    
    What changes were proposed in this PR?
    Add "batch_file_order" option for CarbonInsertFromStagesCommand.
    
    Does this PR introduce any user interface change?
    Yes. (One option "batch_file_order" is added for 
CarbonInsertFromStageCommand, document added)
    
    Is any new testcase added?
    Yes
    
    This closes #3628
---
 docs/dml-of-carbondata.md                          | 14 +++++
 .../org/apache/carbon/flink/TestCarbonWriter.scala | 15 ++---
 .../management/CarbonInsertFromStageCommand.scala  | 70 +++++++++++++++++-----
 3 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 9d935c8..49e5664 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -323,6 +323,7 @@ CarbonData DML statements are documented here,which 
includes:
 | Property                                                | Description        
                                          |
 | ------------------------------------------------------- | 
------------------------------------------------------------ |
 | [BATCH_FILE_COUNT](#batch_file_count)                   | The number of 
stage files per processing                     |
+| [BATCH_FILE_ORDER](#batch_file_order)                   | The order type of 
stage files in per processing                     |
 
 -
   You can use the following options to load data:
@@ -334,11 +335,24 @@ CarbonData DML statements are documented here,which 
includes:
     OPTIONS('batch_file_count'='5')
     ```
 
+  - ##### BATCH_FILE_ORDER: 
+    The order type of stage files in per processing, choices: ASC, DESC.
+    The default is ASC.
+    Stage files will order by the last modified time with the specified order 
type.
+
+    ``` 
+    OPTIONS('batch_file_order'='DESC')
+    ```
+
   Examples:
   ```
   INSERT INTO table1 STAGE
 
   INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5')
+  Note: This command use the default file order, will insert the earliest 
stage files into the table.
+
+  INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5', 
'batch_file_order'='DESC')
+  Note: This command will insert the latest stage files into the table.
   ```
 
 ### Load Data Using Static Partition 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 1d82a75..396703d 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
+
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -37,7 +38,6 @@ class TestCarbonWriter extends QueryTest {
   val tableName = "test_flink"
   val bucketTableName = "insert_bucket_table"
 
-
   test("Writing flink data to local carbon table") {
     sql(s"DROP TABLE IF EXISTS $tableName").collect()
     sql(
@@ -281,9 +281,9 @@ class TestCarbonWriter extends QueryTest {
 
       val plan = sql(
         s"""
-          |select t1.*, t2.*
-          |from $tableName t1, $bucketTableName t2
-          |where t1.stringField = t2.stringField
+           |select t1.*, t2.*
+           |from $tableName t1, $bucketTableName t2
+           |where t1.stringField = t2.stringField
       """.stripMargin).queryExecution.executedPlan
       var shuffleExists = false
       plan.collect {
@@ -297,9 +297,9 @@ class TestCarbonWriter extends QueryTest {
 
       checkAnswer(sql(
         s"""select count(*) from
-          |(select t1.*, t2.*
-          |from $tableName t1, $bucketTableName t2
-          |where t1.stringField = t2.stringField) temp
+           |(select t1.*, t2.*
+           |from $tableName t1, $bucketTableName t2
+           |where t1.stringField = t2.stringField) temp
       """.stripMargin), Row(1000))
     } finally {
       sql(s"DROP TABLE IF EXISTS $tableName").collect()
@@ -307,6 +307,7 @@ class TestCarbonWriter extends QueryTest {
     }
   }
 
+
   private def newWriterProperties(
     dataTempPath: String,
     storeLocation: String) = {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index d63ec24..b971dcd 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConverters._
 import com.google.gson.Gson
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.log4j.Logger
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -35,7 +34,7 @@ import org.apache.spark.sql.util.SparkSQLUtil
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{AbstractDFSCarbonFile, 
CarbonFile}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, 
SegmentFileStore}
@@ -61,12 +60,9 @@ case class CarbonInsertFromStageCommand(
     options: Map[String, String]
 ) extends DataCommand {
 
-  @transient var LOGGER: Logger = _
-
-  val DELETE_FILES_RETRY_TIMES = 3
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   override def processData(spark: SparkSession): Seq[Row] = {
-    LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     Checker.validateTableExists(databaseNameOp, tableName, spark)
     val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
     val hadoopConf = spark.sessionState.newHadoopConf()
@@ -118,15 +114,31 @@ case class CarbonInsertFromStageCommand(
 
       // 1) read all existing stage files
       val batchSize = try {
-        Integer.valueOf(options.getOrElse("batch_file_count", 
Integer.MAX_VALUE.toString))
+        Integer.valueOf(
+          options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY,
+            CarbonInsertFromStageCommand.BATCH_FILE_COUNT_DEFAULT))
       } catch {
         case _: NumberFormatException =>
-          throw new MalformedCarbonCommandException("Option [batch_file_count] 
is not a number.")
+          throw new MalformedCarbonCommandException("Option [" +
+            CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is not a 
number.")
       }
       if (batchSize < 1) {
-        throw new MalformedCarbonCommandException("Option [batch_file_count] 
is less than 1.")
+        throw new MalformedCarbonCommandException("Option [" +
+            CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is less 
than 1.")
+      }
+      val orderType = 
options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY,
+        CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DEFAULT)
+      if 
(!orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC) 
&&
+        
!orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC))
 {
+        throw new MalformedCarbonCommandException("Option [" +
+            CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY + "] is invalid, 
should be " +
+            CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC + " or " +
+            CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC + ".")
       }
-      val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize)
+      LOGGER.info("Option [" + 
CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY +
+                  "] value is " + orderType)
+      val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize,
+        
orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC))
       if (stageFiles.isEmpty) {
         // no stage files, so do nothing
         LOGGER.warn("files not found under stage metadata folder")
@@ -446,7 +458,7 @@ case class CarbonInsertFromStageCommand(
       executorService: ExecutorService,
       stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
     val startTime = System.currentTimeMillis()
-    var retry = DELETE_FILES_RETRY_TIMES
+    var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
     while (deleteStageFiles(executorService, stageFiles).length > 0 && retry > 
0) {
       retry -= 1
     }
@@ -484,7 +496,7 @@ case class CarbonInsertFromStageCommand(
     if (table.isHivePartitionTable) {
       return
     }
-    var retries = DELETE_FILES_RETRY_TIMES
+    var retries = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
     while(deleteSnapShotFile(snapshotFilePath) && retries > 0) {
       retries -= 1
     }
@@ -497,7 +509,8 @@ case class CarbonInsertFromStageCommand(
   private def listStageFiles(
       loadDetailsDir: String,
       hadoopConf: Configuration,
-      batchSize: Int
+      batchSize: Int,
+      ascendingSort: Boolean
   ): Array[(CarbonFile, CarbonFile)] = {
     val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
     if (dir.exists()) {
@@ -518,7 +531,12 @@ case class CarbonInsertFromStageCommand(
       }.filter { file =>
         successFiles.contains(file.getName)
       }.sortWith {
-        (file1, file2) => file1.getLastModifiedTime < file2.getLastModifiedTime
+        (file1, file2) =>
+          if (ascendingSort) {
+            file1.getLastModifiedTime < file2.getLastModifiedTime
+          } else {
+            file1.getLastModifiedTime > file2.getLastModifiedTime
+          }
       }.map { file =>
         (file, successFiles(file.getName))
       }
@@ -556,3 +574,27 @@ case class CarbonInsertFromStageCommand(
 
   override protected def opName: String = "INSERT STAGE"
 }
+
+object CarbonInsertFromStageCommand {
+
+  val DELETE_FILES_RETRY_TIMES = 3
+
+  val BATCH_FILE_COUNT_KEY = "batch_file_count"
+
+  val BATCH_FILE_COUNT_DEFAULT: String = Integer.MAX_VALUE.toString
+
+  val BATCH_FILE_ORDER_KEY = "batch_file_order"
+
+  /**
+   * Use this option will insert the earliest stage files into the table.
+   */
+  val BATCH_FILE_ORDER_ASC = "ASC"
+
+  /**
+   * Use this option will insert the latest stage files into the table.
+   */
+  val BATCH_FILE_ORDER_DESC = "DESC"
+
+  val BATCH_FILE_ORDER_DEFAULT: String = BATCH_FILE_ORDER_KEY
+
+}

Reply via email to