[CARBONDATA-2708][BloomDataMap] clear index file in case of data load failure

When data loading failed, clean the index DataMap files that generated

This closes #2463


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1fd37039
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1fd37039
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1fd37039

Branch: refs/heads/carbonstore
Commit: 1fd3703990c60a6459861575c4f9e0f751be8c5e
Parents: 18381e3
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Wed Jul 11 17:01:21 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Fri Jul 13 15:04:53 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  2 +-
 .../core/datamap/dev/DataMapFactory.java        |  2 +-
 .../blockletindex/BlockletDataMapFactory.java   |  2 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |  6 +++---
 .../examples/MinMaxIndexDataMapFactory.java     |  2 +-
 .../lucene/LuceneDataMapFactoryBase.java        | 11 ++++++-----
 .../testsuite/datamap/CGDataMapTestCase.scala   |  2 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |  2 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  2 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 20 +++++++++++++++++++-
 11 files changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 89a4c86..f6da73e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -210,7 +210,7 @@ public final class TableDataMap extends 
OperationEventListener {
   /**
    * delete only the datamaps of the segments
    */
-  public void deleteDatamapData(List<Segment> segments) {
+  public void deleteDatamapData(List<Segment> segments) throws IOException {
     for (Segment segment: segments) {
       dataMapFactory.deleteDatamapData(segment);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index b115462..0889f8b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -113,7 +113,7 @@ public abstract class DataMapFactory<T extends DataMap> {
   /**
    * delete datamap data in the specified segment
    */
-  public abstract void deleteDatamapData(Segment segment);
+  public abstract void deleteDatamapData(Segment segment) throws IOException;
 
   /**
    * delete datamap data if any

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 175a2a4..643cc45 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -331,7 +331,7 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   }
 
   @Override
-  public void deleteDatamapData(Segment segment) {
+  public void deleteDatamapData(Segment segment) throws IOException {
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 68cf45c..6183077 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -348,7 +348,7 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
   }
 
   @Override
-  public void deleteDatamapData(Segment segment) {
+  public void deleteDatamapData(Segment segment) throws IOException {
     try {
       String segmentId = segment.getSegmentNo();
       String datamapPath = CarbonTablePath
@@ -358,8 +358,8 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
             FileFactory.getFileType(datamapPath));
         CarbonUtil.deleteFoldersAndFilesSilent(file);
       }
-    } catch (IOException | InterruptedException ex) {
-      LOGGER.error("Failed to delete datamap for segment_" + 
segment.getSegmentNo());
+    } catch (InterruptedException ex) {
+      throw new IOException("Failed to delete datamap for segment_" + 
segment.getSegmentNo());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 506b91a..1361d7a 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -157,7 +157,7 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
   }
 
   @Override
-  public void deleteDatamapData(Segment segment) {
+  public void deleteDatamapData(Segment segment) throws IOException {
 
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index 8186841..cd225f1 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -260,7 +260,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
extends DataMapFactor
 
   }
 
-  @Override public void deleteDatamapData(Segment segment) {
+  @Override
+  public void deleteDatamapData(Segment segment) throws IOException {
     try {
       String segmentId = segment.getSegmentNo();
       String datamapPath = CarbonTablePath
@@ -270,13 +271,13 @@ abstract class LuceneDataMapFactoryBase<T extends 
DataMap> extends DataMapFactor
             FileFactory.getFileType(datamapPath));
         CarbonUtil.deleteFoldersAndFilesSilent(file);
       }
-    } catch (IOException | InterruptedException ex) {
-      throw new RuntimeException(
-          "drop datamap failed, failed to delete datamap directory");
+    } catch (InterruptedException ex) {
+      throw new IOException("drop datamap failed, failed to delete datamap 
directory");
     }
   }
 
-  @Override public void deleteDatamapData() {
+  @Override
+  public void deleteDatamapData() {
     try {
       deleteDatamap();
     } catch (MalformedDataMapCommandException ex) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 9a7cb99..046a2a6 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -139,7 +139,7 @@ class CGDataMapFactory(
    * delete datamap of the segment
    */
   override def deleteDatamapData(segment: Segment): Unit = {
-    ???
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 434c363..c015f8d 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -74,7 +74,7 @@ class C2DataMapFactory(
    * delete datamap of the segment
    */
   override def deleteDatamapData(segment: Segment): Unit = {
-    ???
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 9866683..b13582b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -132,7 +132,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
    * delete datamap of the segment
    */
   override def deleteDatamapData(segment: Segment): Unit = {
-    ???
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index 0638a07..f1c9432 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -270,7 +270,7 @@ class TestDataMapFactory(
    * delete datamap of the segment
    */
   override def deleteDatamapData(segment: Segment): Unit = {
-    ???
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2334871..543ba30 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.datastore.block.{Distributable, 
TableBlockInfo}
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
@@ -475,6 +475,7 @@ object CarbonDataRDDFactory {
       if (carbonLoadModel.isCarbonTransactionalTable) {
         // delete segment is applicable for transactional table
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, 
carbonLoadModel.getSegmentId.toInt)
+        clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
       }
       LOGGER.info("********clean up done**********")
       LOGGER.audit(s"Data load is failed for " +
@@ -493,6 +494,7 @@ object CarbonDataRDDFactory {
         if (carbonLoadModel.isCarbonTransactionalTable) {
           // delete segment is applicable for transactional table
           CarbonLoaderUtil.deleteSegment(carbonLoadModel, 
carbonLoadModel.getSegmentId.toInt)
+          clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
@@ -554,6 +556,7 @@ object CarbonDataRDDFactory {
         if (carbonLoadModel.isCarbonTransactionalTable) {
           // delete segment is applicable for transactional table
           CarbonLoaderUtil.deleteSegment(carbonLoadModel, 
carbonLoadModel.getSegmentId.toInt)
+          clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
         LOGGER.audit("Data load is failed for " +
@@ -589,6 +592,21 @@ object CarbonDataRDDFactory {
   }
 
   /**
+   * clear datamap files for segment
+   */
+  private def clearDataMapFiles(carbonTable: CarbonTable, segmentId: String): 
Unit = {
+    try {
+      val segments = List(new Segment(segmentId)).asJava
+      DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala
+        .filter(_.getDataMapSchema.isIndexDataMap)
+        .foreach(_.deleteDatamapData(segments))
+    } catch {
+      case ex : Exception =>
+        LOGGER.error(s"Failed to clear datamap files for" +
+                     s" 
${carbonTable.getDatabaseName}.${carbonTable.getTableName}")
+    }
+  }
+  /**
    * Add and update the segment files. In case of update scenario the 
carbonindex files are written
    * to the same segment so we need to update old segment file. So this ethod 
writes the latest data
    * to new segment file and merges this file old file to get latest updated 
files.

Reply via email to