[GitHub] [carbondata] marchpure commented on a change in pull request #3886: [CARBONDATA-3944] Delete stage files was interrupted when IOException…

2020-08-11 Thread GitBox


marchpure commented on a change in pull request #3886:
URL: https://github.com/apache/carbondata/pull/3886#discussion_r468461356



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##
@@ -499,25 +499,31 @@ case class CarbonInsertFromStageCommand(
* return the loading files failed to create
*/
   private def createStageLoadingFiles(
+  stagePath: String,
   executorService: ExecutorService,
   stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
 stageFiles.map { files =>
   executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
 override def call(): (CarbonFile, CarbonFile, Boolean) = {
-  // Get the loading files path
-  val stageLoadingFile =
-FileFactory.getCarbonFile(files._1.getAbsolutePath +
-  CarbonTablePath.LOADING_FILE_SUFFIX);
-  // Try to create loading files
-  // make isFailed to be true if createNewFile return false.
-  // the reason can be file exists or exceptions.
-  var isFailed = !stageLoadingFile.createNewFile()
-  // if file exists, modify the lastModifiedTime of the file.
-  if (isFailed) {
-// make isFailed to be true if setLastModifiedTime return false.
-isFailed = 
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+  try {
+// Get the loading files path
+val stageLoadingFile =
+  FileFactory.getCarbonFile(stagePath +
+CarbonCommonConstants.FILE_SEPARATOR +
+files._1.getName + CarbonTablePath.LOADING_FILE_SUFFIX);
+// Try to create loading files
+// make isFailed to be true if createNewFile return false.
+// the reason can be file exists or exceptions.
+var isFailed = !stageLoadingFile.createNewFile()
+// if file exists, modify the lastmodifiedtime of the file.
+if (isFailed) {
+  // make isFailed to be true if setLastModifiedTime return false.
+  isFailed = 
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+}
+(files._1, files._2, isFailed)
+  } catch {
+case _ : Exception => (files._1, files._2, true)

Review comment:
   The third paramter is 'isFailed'. when isFailed is equal to be 'true', 
we can retry to delete files. 

##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##
@@ -557,25 +564,30 @@ case class CarbonInsertFromStageCommand(
* Return the files failed to delete
*/
   private def deleteStageFiles(
+  stagePath: String,
   executorService: ExecutorService,
   stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
 stageFiles.map { files =>
   executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
 override def call(): (CarbonFile, CarbonFile, Boolean) = {
   // Delete three types of file: stage|.success|.loading
-  val stageLoadingFile =
-FileFactory.getCarbonFile(files._1.getAbsolutePath
-  + CarbonTablePath.LOADING_FILE_SUFFIX);
-  var isFailed = false
-  // If delete() return false, maybe the reason is FileNotFount or 
FileFailedClean.
-  // Considering FileNotFound means file clean successfully.
-  // We need double check the file exists or not when delete() return 
false.
-  if (!(files._1.delete() && files._2.delete() && 
stageLoadingFile.delete())) {
-// If the file still exists,  make isFailed to be true
-// So we can retry to delete this file.
-isFailed = files._1.exists() || files._1.exists() || 
stageLoadingFile.exists()
+  try {
+val stageLoadingFile = FileFactory.getCarbonFile(stagePath +
+  CarbonCommonConstants.FILE_SEPARATOR +
+  files._1.getName + CarbonTablePath.LOADING_FILE_SUFFIX);
+var isFailed = false
+// If delete() return false, maybe the reason is FileNotFount or 
FileFailedClean.
+// Considering FileNotFound means FileCleanSucessfully.
+// We need double check the file exists or not when delete() 
return false.
+if (!files._1.delete() || !files._2.delete() || 
!stageLoadingFile.delete()) {
+  // If the file still exists,  make isFailed to be true
+  // So we can retry to delete this file.
+  isFailed = files._1.exists() || files._1.exists() || 
stageLoadingFile.exists()
+}
+(files._1, files._2, isFailed)
+  } catch {
+case _: Exception => (files._1, files._2, true)

Review comment:
   The third paramter is 'isFailed'. when isFa

[GitHub] [carbondata] marchpure commented on a change in pull request #3886: [CARBONDATA-3944] Delete stage files was interrupted when IOException…

2020-08-11 Thread GitBox


marchpure commented on a change in pull request #3886:
URL: https://github.com/apache/carbondata/pull/3886#discussion_r468453764



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##
@@ -499,25 +499,31 @@ case class CarbonInsertFromStageCommand(
* return the loading files failed to create
*/
   private def createStageLoadingFiles(
+  stagePath: String,
   executorService: ExecutorService,
   stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
 stageFiles.map { files =>
   executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
 override def call(): (CarbonFile, CarbonFile, Boolean) = {
-  // Get the loading files path
-  val stageLoadingFile =
-FileFactory.getCarbonFile(files._1.getAbsolutePath +
-  CarbonTablePath.LOADING_FILE_SUFFIX);
-  // Try to create loading files
-  // make isFailed to be true if createNewFile return false.
-  // the reason can be file exists or exceptions.
-  var isFailed = !stageLoadingFile.createNewFile()
-  // if file exists, modify the lastModifiedTime of the file.
-  if (isFailed) {
-// make isFailed to be true if setLastModifiedTime return false.
-isFailed = 
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+  try {
+// Get the loading files path
+val stageLoadingFile =
+  FileFactory.getCarbonFile(stagePath +
+CarbonCommonConstants.FILE_SEPARATOR +

Review comment:
   modified





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] marchpure commented on a change in pull request #3886: [CARBONDATA-3944] Delete stage files was interrupted when IOException…

2020-08-11 Thread GitBox


marchpure commented on a change in pull request #3886:
URL: https://github.com/apache/carbondata/pull/3886#discussion_r468453680



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
##
@@ -557,25 +564,30 @@ case class CarbonInsertFromStageCommand(
* Return the files failed to delete
*/
   private def deleteStageFiles(
+  stagePath: String,
   executorService: ExecutorService,
   stageFiles: Array[(CarbonFile, CarbonFile)]): Array[(CarbonFile, 
CarbonFile)] = {
 stageFiles.map { files =>
   executorService.submit(new Callable[(CarbonFile, CarbonFile, Boolean)] {
 override def call(): (CarbonFile, CarbonFile, Boolean) = {
   // Delete three types of file: stage|.success|.loading
-  val stageLoadingFile =
-FileFactory.getCarbonFile(files._1.getAbsolutePath
-  + CarbonTablePath.LOADING_FILE_SUFFIX);
-  var isFailed = false
-  // If delete() return false, maybe the reason is FileNotFount or 
FileFailedClean.
-  // Considering FileNotFound means file clean successfully.
-  // We need double check the file exists or not when delete() return 
false.
-  if (!(files._1.delete() && files._2.delete() && 
stageLoadingFile.delete())) {
-// If the file still exists,  make isFailed to be true
-// So we can retry to delete this file.
-isFailed = files._1.exists() || files._1.exists() || 
stageLoadingFile.exists()
+  try {
+val stageLoadingFile = FileFactory.getCarbonFile(stagePath +
+  CarbonCommonConstants.FILE_SEPARATOR +

Review comment:
   modified





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org