Changes done:
1. Clean up of folders created locally during data load and insert into 
operations.
2. Setting the load status properly for success, partial success and failure 
cases.
3. Printing load statistics in case of success and partial success.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/487e41dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/487e41dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/487e41dd

Branch: refs/heads/12-dev
Commit: 487e41ddd13d9e19813cc1b9c4eda73376f1c8ba
Parents: ada081d
Author: manishgupta88 <tomanishgupt...@gmail.com>
Authored: Wed Apr 5 19:04:14 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Apr 6 10:33:07 2017 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 30 ++++++++++++++++----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 30 ++++++++++++++++----
 2 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/487e41dd/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index cc16398..95f0b10 100644
--- 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -36,6 +36,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -171,13 +174,30 @@ public final class CarbonLoaderUtil {
       tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + 
tempLocationKey;
     }
     // form local store location
-    String localStoreLocation = CarbonProperties.getInstance()
+    final String localStoreLocation = CarbonProperties.getInstance()
         .getProperty(tempLocationKey, 
CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
+    // submit local folder clean up in another thread so that main thread 
execution is not blocked
+    ExecutorService localFolderDeletionService = 
Executors.newFixedThreadPool(1);
     try {
-      CarbonUtil.deleteFoldersAndFiles(new 
File(localStoreLocation).getParentFile());
-      LOGGER.info("Deleted the local store location" + localStoreLocation);
-    } catch (IOException | InterruptedException e) {
-      LOGGER.error(e, "Failed to delete local data load folder location");
+      localFolderDeletionService.submit(new Callable<Void>() {
+        @Override public Void call() throws Exception {
+          try {
+            long startTime = System.currentTimeMillis();
+            File file = new File(localStoreLocation);
+            CarbonUtil.deleteFoldersAndFiles(file.getParentFile());
+            LOGGER.info(
+                "Deleted the local store location" + localStoreLocation + " : 
TIme taken: " + (
+                    System.currentTimeMillis() - startTime));
+          } catch (IOException | InterruptedException e) {
+            LOGGER.error(e, "Failed to delete local data load folder 
location");
+          }
+          return null;
+        }
+      });
+    } finally {
+      if (null != localFolderDeletionService) {
+        localFolderDeletionService.shutdown();
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/487e41dd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 0690ba1..72ee90f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -223,7 +223,7 @@ class NewCarbonDataLoadRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + 
theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
-        
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+        
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
 
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         val preFetch = 
CarbonProperties.getInstance().getProperty(CarbonCommonConstants
@@ -237,7 +237,6 @@ class NewCarbonDataLoadRDD[K, V](
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
-        
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         new DataLoadExecutor().execute(model,
           loader.storeLocation,
           recordReaders)
@@ -246,9 +245,20 @@ class NewCarbonDataLoadRDD[K, V](
           
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
           logInfo("Bad Record Found")
         case e: Exception =>
+          
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           logInfo("DataLoad failure", e)
           LOGGER.error(e)
           throw e
+      } finally {
+        // clean up the folders and files created locally for data load 
operation
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        // in case of failure the same operation will be re-tried several 
times.
+        // So print the data load statistics only in case of non failure case
+        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+          .equals(loadMetadataDetails.getLoadStatus)) {
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance
+            .printStatisticsInfo(model.getPartitionId)
+        }
       }
 
       def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
@@ -389,7 +399,7 @@ class NewDataFrameLoaderRDD[K, V](
       try {
 
         loadMetadataDetails.setPartitionCount(partitionID)
-        
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+        
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
@@ -420,18 +430,26 @@ class NewDataFrameLoaderRDD[K, V](
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
-
-        
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         new DataLoadExecutor().execute(model, loader.storeLocation, 
recordReaders.toArray)
-
       } catch {
         case e: BadRecordFoundException =>
           
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
           logInfo("Bad Record Found")
         case e: Exception =>
+          
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
           logInfo("DataLoad failure", e)
           LOGGER.error(e)
           throw e
+      } finally {
+        // clean up the folders and files created locally for data load 
operation
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        // in case of failure the same operation will be re-tried several 
times.
+        // So print the data load statistics only in case of non failure case
+        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+          .equals(loadMetadataDetails.getLoadStatus)) {
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance
+            .printStatisticsInfo(model.getPartitionId)
+        }
       }
       var finished = false
 

Reply via email to