[CARBONDATA-2708][BloomDataMap] clear index file in case of data load failure
When data loading failed, clean the index DataMap files that generated This closes #2463 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1fd37039 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1fd37039 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1fd37039 Branch: refs/heads/carbonstore Commit: 1fd3703990c60a6459861575c4f9e0f751be8c5e Parents: 18381e3 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Wed Jul 11 17:01:21 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Jul 13 15:04:53 2018 +0800 ---------------------------------------------------------------------- .../carbondata/core/datamap/TableDataMap.java | 2 +- .../core/datamap/dev/DataMapFactory.java | 2 +- .../blockletindex/BlockletDataMapFactory.java | 2 +- .../bloom/BloomCoarseGrainDataMapFactory.java | 6 +++--- .../examples/MinMaxIndexDataMapFactory.java | 2 +- .../lucene/LuceneDataMapFactoryBase.java | 11 ++++++----- .../testsuite/datamap/CGDataMapTestCase.scala | 2 +- .../testsuite/datamap/DataMapWriterSuite.scala | 2 +- .../testsuite/datamap/FGDataMapTestCase.scala | 2 +- .../testsuite/datamap/TestDataMapStatus.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 20 +++++++++++++++++++- 11 files changed, 36 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 89a4c86..f6da73e 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -210,7 +210,7 @@ public final class TableDataMap extends OperationEventListener { /** * delete only the datamaps of the segments */ - public void deleteDatamapData(List<Segment> segments) { + public void deleteDatamapData(List<Segment> segments) throws IOException { for (Segment segment: segments) { dataMapFactory.deleteDatamapData(segment); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index b115462..0889f8b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -113,7 +113,7 @@ public abstract class DataMapFactory<T extends DataMap> { /** * delete datamap data in the specified segment */ - public abstract void deleteDatamapData(Segment segment); + public abstract void deleteDatamapData(Segment segment) throws IOException; /** * delete datamap data if any http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 175a2a4..643cc45 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -331,7 +331,7 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory } @Override - public void deleteDatamapData(Segment segment) { + public void deleteDatamapData(Segment segment) throws IOException { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 68cf45c..6183077 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -348,7 +348,7 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa } @Override - public void deleteDatamapData(Segment segment) { + public void deleteDatamapData(Segment segment) throws IOException { try { String segmentId = segment.getSegmentNo(); String datamapPath = CarbonTablePath @@ -358,8 +358,8 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa FileFactory.getFileType(datamapPath)); CarbonUtil.deleteFoldersAndFilesSilent(file); } - } catch (IOException | InterruptedException ex) { - LOGGER.error("Failed to delete datamap for segment_" + segment.getSegmentNo()); + } catch (InterruptedException ex) { + throw new IOException("Failed to delete datamap for segment_" + segment.getSegmentNo()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java index 506b91a..1361d7a 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java @@ -157,7 +157,7 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory { } @Override - public void deleteDatamapData(Segment segment) { + public void deleteDatamapData(Segment segment) throws IOException { } http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index 8186841..cd225f1 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -260,7 +260,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor } - @Override public void deleteDatamapData(Segment segment) { + @Override + public void deleteDatamapData(Segment segment) throws IOException { try { String segmentId = segment.getSegmentNo(); String datamapPath = CarbonTablePath @@ -270,13 +271,13 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> extends DataMapFactor FileFactory.getFileType(datamapPath)); CarbonUtil.deleteFoldersAndFilesSilent(file); } - } catch (IOException | InterruptedException ex) { - throw new RuntimeException( - "drop datamap failed, failed to delete datamap directory"); + } catch (InterruptedException ex) { + throw new IOException("drop datamap failed, failed to delete datamap directory"); } } - @Override public void deleteDatamapData() { + @Override + public void deleteDatamapData() { try { deleteDatamap(); } catch (MalformedDataMapCommandException ex) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index 9a7cb99..046a2a6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -139,7 +139,7 @@ class CGDataMapFactory( * delete datamap of the segment */ override def deleteDatamapData(segment: Segment): Unit = { - ??? + } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index 434c363..c015f8d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -74,7 +74,7 @@ class C2DataMapFactory( * delete datamap of the segment */ override def deleteDatamapData(segment: Segment): Unit = { - ??? + } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index 9866683..b13582b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -132,7 +132,7 @@ class FGDataMapFactory(carbonTable: CarbonTable, * delete datamap of the segment */ override def deleteDatamapData(segment: Segment): Unit = { - ??? + } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index 0638a07..f1c9432 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -270,7 +270,7 @@ class TestDataMapFactory( * delete datamap of the segment */ override def deleteDatamapData(segment: Segment): Unit = { - ??? + } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/1fd37039/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 2334871..543ba30 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo} import org.apache.carbondata.core.datastore.filesystem.CarbonFile @@ -475,6 +475,7 @@ object CarbonDataRDDFactory { if (carbonLoadModel.isCarbonTransactionalTable) { // delete segment is applicable for transactional table CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) + clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") LOGGER.audit(s"Data load is failed for " + @@ -493,6 +494,7 @@ object CarbonDataRDDFactory { if (carbonLoadModel.isCarbonTransactionalTable) { // delete segment is applicable for transactional table CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) + clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") LOGGER.audit(s"Data load is failed for " + @@ -554,6 +556,7 @@ object CarbonDataRDDFactory { if (carbonLoadModel.isCarbonTransactionalTable) { // delete segment is applicable for transactional table CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) + clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId) } LOGGER.info("********clean up done**********") LOGGER.audit("Data load is failed for " + @@ -589,6 +592,21 @@ object CarbonDataRDDFactory { } /** + * clear datamap files for segment + */ + private def clearDataMapFiles(carbonTable: CarbonTable, segmentId: String): Unit = { + try { + val segments = List(new Segment(segmentId)).asJava + DataMapStoreManager.getInstance().getAllDataMap(carbonTable).asScala + .filter(_.getDataMapSchema.isIndexDataMap) + .foreach(_.deleteDatamapData(segments)) + } catch { + case ex : Exception => + LOGGER.error(s"Failed to clear datamap files for" + + s" ${carbonTable.getDatabaseName}.${carbonTable.getTableName}") + } + } + /** * Add and update the segment files. In case of update scenario the carbonindex files are written * to the same segment so we need to update old segment file. So this ethod writes the latest data * to new segment file and merges this file old file to get latest updated files.