marchpure commented on a change in pull request #3886:
URL: https://github.com/apache/carbondata/pull/3886#discussion_r468461356
##
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##
@@ -499,25 +499,31 @@ case class CarbonInsertFromStageCommand(
* return the loading files failed to create
*/
private def createStageLoadingFiles(
+ stagePath: String,
executorService: ExecutorService,
stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile,
CarbonFile)] = {
stageFiles.map { files =>
executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
override def call(): (CarbonFile, CarbonFile, Boolean) = {
- // Get the loading files path
- val stageLoadingFile =
-FileFactory.getCarbonFile(files._1.getAbsolutePath +
- CarbonTablePath.LOADING_FILE_SUFFIX);
- // Try to create loading files
- // make isFailed to be true if createNewFile return false.
- // the reason can be file exists or exceptions.
- var isFailed = !stageLoadingFile.createNewFile()
- // if file exists, modify the lastModifiedTime of the file.
- if (isFailed) {
-// make isFailed to be true if setLastModifiedTime return false.
-isFailed =
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+ try {
+// Get the loading files path
+val stageLoadingFile =
+ FileFactory.getCarbonFile(stagePath +
+CarbonCommonConstants.FILE_SEPARATOR +
+files._1.getName + CarbonTablePath.LOADING_FILE_SUFFIX);
+// Try to create loading files
+// make isFailed to be true if createNewFile return false.
+// the reason can be file exists or exceptions.
+var isFailed = !stageLoadingFile.createNewFile()
+// if file exists, modify the lastmodifiedtime of the file.
+if (isFailed) {
+ // make isFailed to be true if setLastModifiedTime return false.
+ isFailed =
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+}
+(files._1, files._2, isFailed)
+ } catch {
+case _ : Exception => (files._1, files._2, true)
Review comment:
The third paramter is 'isFailed'. when isFailed is equal to be 'true',
we can retry to delete files.
##
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##
@@ -557,25 +564,30 @@ case class CarbonInsertFromStageCommand(
* Return the files failed to delete
*/
private def deleteStageFiles(
+ stagePath: String,
executorService: ExecutorService,
stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile,
CarbonFile)] = {
stageFiles.map { files =>
executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
override def call(): (CarbonFile, CarbonFile, Boolean) = {
// Delete three types of file: stage|.success|.loading
- val stageLoadingFile =
-FileFactory.getCarbonFile(files._1.getAbsolutePath
- + CarbonTablePath.LOADING_FILE_SUFFIX);
- var isFailed = false
- // If delete() return false, maybe the reason is FileNotFount or
FileFailedClean.
- // Considering FileNotFound means file clean successfully.
- // We need double check the file exists or not when delete() return
false.
- if (!(files._1.delete() && files._2.delete() &&
stageLoadingFile.delete())) {
-// If the file still exists, make isFailed to be true
-// So we can retry to delete this file.
-isFailed = files._1.exists() || files._1.exists() ||
stageLoadingFile.exists()
+ try {
+val stageLoadingFile = FileFactory.getCarbonFile(stagePath +
+ CarbonCommonConstants.FILE_SEPARATOR +
+ files._1.getName + CarbonTablePath.LOADING_FILE_SUFFIX);
+var isFailed = false
+// If delete() return false, maybe the reason is FileNotFount or
FileFailedClean.
+// Considering FileNotFound means FileCleanSucessfully.
+// We need double check the file exists or not when delete()
return false.
+if (!files._1.delete() || !files._2.delete() ||
!stageLoadingFile.delete()) {
+ // If the file still exists, make isFailed to be true
+ // So we can retry to delete this file.
+ isFailed = files._1.exists() || files._1.exists() ||
stageLoadingFile.exists()
+}
+(files._1, files._2, isFailed)
+ } catch {
+case _: Exception => (files._1, files._2, true)
Review comment:
The third paramter is 'isFailed'. when isFa