This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b332d90  [CARBONDATA-3667] Insert stage recover processing of the 
partition table throw exception "the unexpected 0 segment found"
b332d90 is described below

commit b332d908846a31aac7ee9fe196802dcfe4346b31
Author: h00424960 <haoxing...@huawei.com>
AuthorDate: Sun Jan 26 21:29:11 2020 +0800

    [CARBONDATA-3667] Insert stage recover processing of the partition table 
throw exception "the unexpected 0 segment found"
    
    Why is this PR needed?
      Recover processing when executing insert stage a partition table throw 
exception "the unexpected 0 segment found"
      The reason is the snapshot content of partition table is wrong.
    
    What changes were proposed in this PR?
      Let's review the recover processing's purpose, when the segment is loaded 
successfully, but fail to clean the stage files, the stage(data) will be loaded 
again which leads to repetition of data. Before, we expect to save a snapshot 
file consists of segment id and stage file list, the stage files failed to 
clean can be clean again when we found the corresponding segment has been 
loaded successfully. But the segmentid can't be achieve while loading partition 
table, in the other words, [...]
      After discussion, we believe that this problem can also be alleviated by 
adding retry mechanism when deleting stage files. Thus, there are two changes 
were proposed which is shown below.
      1) Remove recover processing of Insert stage command.
      2) Add Retry to delete stage files.
    
    Does this PR introduce any user interface change?
      No
    
    Is any new testcase added?
      No
    
    This closes #3589
---
 .../management/CarbonInsertFromStageCommand.scala  | 96 +++++++++++++++++-----
 1 file changed, 77 insertions(+), 19 deletions(-)

diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 0f5c4ae..fe3175a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.management
 import java.io.{InputStreamReader, IOException}
 import java.util
 import java.util.Collections
-import java.util.concurrent.{Executors, ExecutorService}
+import java.util.concurrent.{Callable, Executors, ExecutorService}
 
 import scala.collection.JavaConverters._
 
@@ -62,6 +62,8 @@ case class CarbonInsertFromStageCommand(
 
   @transient var LOGGER: Logger = _
 
+  val DELETE_FILES_RETRY_TIMES = 3
+
   override def processData(spark: SparkSession): Seq[Row] = {
     LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     Checker.validateTableExists(databaseNameOp, tableName, spark)
@@ -144,10 +146,10 @@ case class CarbonInsertFromStageCommand(
       }
 
       // 4) delete stage files
-      deleteStageFiles(executorService, stageFiles)
+      deleteStageFilesWithRetry(executorService, stageFiles)
 
       // 5) delete the snapshot file
-      FileFactory.getCarbonFile(snapshotFilePath).delete()
+      deleteSnapShotFileWithRetry(table, snapshotFilePath)
     } catch {
       case ex: Throwable =>
         LOGGER.error(s"failed to insert 
${table.getDatabaseName}.${table.getTableName}", ex)
@@ -165,7 +167,7 @@ case class CarbonInsertFromStageCommand(
       snapshotFilePath: String,
       table: CarbonTable,
       conf: Configuration): Unit = {
-    if (!FileFactory.isFileExist(snapshotFilePath)) {
+    if (!FileFactory.isFileExist(snapshotFilePath) || 
table.isHivePartitionTable) {
       // everything is fine
       return
     }
@@ -189,7 +191,7 @@ case class CarbonInsertFromStageCommand(
       table.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
     if (!lock.lockWithRetries()) {
       throw new RuntimeException(s"Failed to lock table status for " +
-                                 
s"${table.getDatabaseName}.${table.getTableName}")
+        s"${table.getDatabaseName}.${table.getTableName}")
     }
     try {
       val segments = SegmentStatusManager.readTableStatusFile(
@@ -205,7 +207,7 @@ case class CarbonInsertFromStageCommand(
           lock.unlock()
           lock = null
           LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to 
delete " +
-                      s"${stageFileNames.length} stage files")
+            s"${stageFileNames.length} stage files")
           val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10)
           val executorService = Executors.newFixedThreadPool(numThreads)
           stageFileNames.map { fileName =>
@@ -213,7 +215,7 @@ case class CarbonInsertFromStageCommand(
               override def run(): Unit = {
                 FileFactory.getCarbonFile(
                   CarbonTablePath.getStageDir(table.getTablePath) +
-                  CarbonCommonConstants.FILE_SEPARATOR + fileName
+                    CarbonCommonConstants.FILE_SEPARATOR + fileName
                 ).delete()
               }
             })
@@ -223,7 +225,7 @@ case class CarbonInsertFromStageCommand(
         case other =>
           // delete entry in table status and load again
           LOGGER.warn(s"Segment $segmentId is in $other state, about to delete 
the " +
-                      s"segment entry and load again")
+            s"segment entry and load again")
           val segmentToWrite = 
segments.filterNot(_.getLoadName.equals(segmentId))
           SegmentStatusManager.writeLoadDetailsIntoFile(
             CarbonTablePath.getTableStatusFilePath(table.getTablePath),
@@ -307,9 +309,6 @@ case class CarbonInsertFromStageCommand(
     ): Unit = {
     val partitionDataList = listPartitionFiles(stageInput)
 
-    val content = stageFiles.map(_._1.getAbsolutePath).mkString("\n")
-    FileFactory.writeFile(content, snapshotFilePath)
-
     val start = System.currentTimeMillis()
     partitionDataList.map {
       case (partition, splits) =>
@@ -414,23 +413,82 @@ case class CarbonInsertFromStageCommand(
 
   /**
    * Delete stage file and success file
+   * Return false means the stage files were cleaned successfully
+   * While return true means the stage files were failed to clean
    */
   private def deleteStageFiles(
       executorService: ExecutorService,
-      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
-    val startTime = System.currentTimeMillis()
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
     stageFiles.map { files =>
-      executorService.submit(new Runnable {
-        override def run(): Unit = {
-          files._1.delete()
-          files._2.delete()
+      executorService.submit(new Callable[Boolean] {
+        override def call(): Boolean = {
+          // If delete() return false, maybe the reason is FileNotFount or 
FileFailedClean.
+          // Considering FileNotFound means FileCleanSucessfully.
+          // We need double check the file exists or not when delete() return 
false.
+          if (!(files._1.delete() && files._2.delete())) {
+            // If the file still exists, return ture, let the file filtered in.
+            // So we can retry to delete this file.
+            return files._1.exists() || files._1.exists()
+          }
+          // When delete successfully, return false, let the file filtered 
away.
+          false
         }
       })
-    }.map { future =>
+    }.filter { future =>
       future.get()
     }
+    stageFiles
+  }
+
+  /**
+   * Delete stage file and success file with retry
+   */
+  private def deleteStageFilesWithRetry(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
+    val startTime = System.currentTimeMillis()
+    var retry = DELETE_FILES_RETRY_TIMES
+    while (deleteStageFiles(executorService, stageFiles).length > 0 && retry > 
0) {
+      retry -= 1
+    }
     LOGGER.info(s"finished to delete stage files, time taken: " +
-                s"${System.currentTimeMillis() - startTime}ms")
+      s"${System.currentTimeMillis() - startTime}ms")
+    // if there are still stage files failed to clean, print log.
+    if (stageFiles.length > 0) {
+      LOGGER.warn(s"failed to clean up stage files:" + 
stageFiles.map(_._1.getName).mkString(","))
+    }
+  }
+
+  /**
+   * Delete snapshot file with retry
+   * Return false means the snapshot file was cleaned successfully
+   * While return true means the snapshot file was failed to clean
+   */
+  private def deleteSnapShotFile(
+      snapshotFilePath: String): Boolean = {
+    val snapshotFile = FileFactory.getCarbonFile(snapshotFilePath)
+    // If delete() return false, maybe the reason is FileNotFount or 
FileFailedClean.
+    // Considering FileNotFound means FileCleanSucessfully.
+    // We need double check the file exists or not when delete() return false.
+    if (!snapshotFile.delete()) {
+      return snapshotFile.exists()
+    }
+    true
+  }
+
+  /**
+   * Delete snapshot file with retry
+   */
+  private def deleteSnapShotFileWithRetry(
+      table: CarbonTable,
+      snapshotFilePath: String): Unit = {
+    if (table.isHivePartitionTable) {
+      return
+    }
+    var retries = DELETE_FILES_RETRY_TIMES
+    while(deleteSnapShotFile(snapshotFilePath) && retries > 0) {
+      retries -= 1
+    }
   }
 
   /*

Reply via email to