niuge01 commented on a change in pull request #3799:
URL: https://github.com/apache/carbondata/pull/3799#discussion_r443412925



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##########
@@ -477,6 +487,48 @@ case class CarbonInsertFromStageCommand(
     output.asScala
   }
 
+  /**
+   * create '.loading' file to tag the stage in process
+   * Return false means the stage files were creat successfully
+   * While return true means the stage files were failed to create
+   */
+  private def createStageLoadingFiles(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
+    stageFiles.map { files =>
+      executorService.submit(new Callable[Boolean] {
+        override def call(): Boolean = {
+          val stageLoadingFile =
+            FileFactory.getCarbonFile(files._1.getAbsolutePath +
+              CarbonTablePath.LOADING_FILE_SUBFIX);
+          if (!stageLoadingFile.exists()) {
+            stageLoadingFile.createNewFile();
+          } else {
+            stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+          }
+        }
+      })
+    }.filter { future =>
+      future.get()
+    }
+    stageFiles
+  }
+
+  /**
+   * create '.loading' file with retry
+   */
+  private def createStageLoadingFilesWithRetry(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
+    val startTime = System.currentTimeMillis()
+    var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
+    while (createStageLoadingFiles(executorService, stageFiles).length > 0 && 
retry > 0) {

Review comment:
       Please check this loop condition, if 
createStageLoadingFiles(executorService, stageFiles).length > 0, should loop 
continue?

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -1903,6 +1903,30 @@ public static Long getInputMetricsInterval() {
     }
   }
 
+  /**
+   * Validate and get the input metrics interval
+   *
+   * @return input metrics interval
+   */
+  public static Long getInsertStageTimeout() {
+    String timeout = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT);
+    if (timeout == null) {
+      return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT;
+    } else {
+      try {
+        long configuredValue = Long.parseLong(timeout);
+        if (configuredValue < 0) {
+          return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT;

Review comment:
       Log a warning for illegal configuration value

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -1903,6 +1903,30 @@ public static Long getInputMetricsInterval() {
     }
   }
 
+  /**
+   * Validate and get the input metrics interval
+   *
+   * @return input metrics interval
+   */
+  public static Long getInsertStageTimeout() {
+    String timeout = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT);
+    if (timeout == null) {
+      return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT;
+    } else {
+      try {
+        long configuredValue = Long.parseLong(timeout);
+        if (configuredValue < 0) {
+          return CarbonCommonConstants.CARBON_INSERT_STAGE_TIMEOUT_DEFAULT;
+        } else {
+          return configuredValue;
+        }
+      } catch (Exception ex) {

Review comment:
       Catch NumberFormatException。
   Log a warning for exception.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##########
@@ -148,10 +149,19 @@ case class CarbonInsertFromStageCommand(
         return Seq.empty
       }
 
-      // 2) read all stage files to collect input files for data loading
-      // create a thread pool to read them
+      // We add a tag 'loading' to the stages in process.
+      // different insertstage processes can load different data separately
+      // by choose the stages without 'loading' tag or stages loaded timeout.
+      // which avoid loading the same data between concurrent insertstage 
processes.
+      // The 'loading' tag is actually an empty file with
+      // '.loading' suffix filename
       val numThreads = Math.min(Math.max(stageFiles.length, 1), 10)
       val executorService = Executors.newFixedThreadPool(numThreads)
+      createStageLoadingFilesWithRetry(executorService, stageFiles)
+      lock.unlock()

Review comment:
       remove this line, lock will unlock in finally block.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -1521,6 +1521,10 @@ private CarbonCommonConstants() {
 
   public static final String CARBON_QUERY_STAGE_INPUT_DEFAULT = "false";
 
+  public static final String CARBON_INSERT_STAGE_TIMEOUT = 
"carbon.insert.stage.timeout";
+
+  public static final long CARBON_INSERT_STAGE_TIMEOUT_DEFAULT = 28800000;

Review comment:
       add comment here, like // 8 hours.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##########
@@ -477,6 +487,48 @@ case class CarbonInsertFromStageCommand(
     output.asScala
   }
 
+  /**
+   * create '.loading' file to tag the stage in process
+   * Return false means the stage files were creat successfully
+   * While return true means the stage files were failed to create
+   */
+  private def createStageLoadingFiles(
+      executorService: ExecutorService,
+      stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
+    stageFiles.map { files =>
+      executorService.submit(new Callable[Boolean] {
+        override def call(): Boolean = {
+          val stageLoadingFile =
+            FileFactory.getCarbonFile(files._1.getAbsolutePath +
+              CarbonTablePath.LOADING_FILE_SUBFIX);
+          if (!stageLoadingFile.exists()) {

Review comment:
       exists and createNewFile not in a same transaction, so createNewFile may 
be fail. suggest change to:
   if (!stageLoadingFile.createNewFile()) {
     stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
   }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to