This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 02efddc [CARBONDATA-3790] Fix SI table consistency with main table segments 02efddc is described below commit 02efddcb0c1f514e74ccab7bc6204c93e551a0f0 Author: akashrn5 <akashnilu...@gmail.com> AuthorDate: Thu Apr 30 14:41:10 2020 +0530 [CARBONDATA-3790] Fix SI table consistency with main table segments Why is this PR needed? Consider a scenario when SI loading is happening after main tale load, then when taking segment load, we got an issue and we skipped assuming that the missed segments will be loaded in next load by SILoadEventListenerForFailedSegments, but the status of SI is still enabled. But SILoadEventListenerForFailedSegments will only load to skipped segments if the status is disabled which will lead to segment mismatch between main and SI table, which ay lead query failure or data mismatch. What changes were proposed in this PR? If it fails to take segment lock, during Si load, add to a skip list, if that is not empty, make the SI disable, so that SILoadEventListenerForFailedSegments will take care to load the missing ones in next load to the main table. Does this PR introduce any user interface change? No Is any new test case added? No This closes #3734 --- .../sql/secondaryindex/rdd/SecondaryIndexCreator.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala index ec63065..04cfbb0 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala @@ -91,6 +91,7 @@ object SecondaryIndexCreator { var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty val validSegments: java.util.List[String] = new util.ArrayList[String]() + val skippedSegments: java.util.List[String] = new util.ArrayList[String]() var validSegmentList = List.empty[String] try { @@ -110,6 +111,7 @@ object SecondaryIndexCreator { // skipped segments load will be handled in SILoadEventListenerForFailedSegments validSegments.add(eachSegment) } else { + skippedSegments.add(eachSegment) LOGGER.error(s"Not able to acquire the segment lock for table" + s" ${indexCarbonTable.getTableUniqueName} for segment: $eachSegment. " + s"Skipping this segment from loading.") @@ -321,6 +323,18 @@ object SecondaryIndexCreator { segmentLocks.foreach(segmentLock => { segmentLock.unlock() }) + // if some segments are skipped, disable the SI table so that + // SILoadEventListenerForFailedSegments will take care to load to these segments in next + // consecutive load to main table. + if (!skippedSegments.isEmpty) { + secondaryIndexModel.sqlContext.sparkSession.sql( + s"""ALTER TABLE ${ + secondaryIndexModel + .carbonLoadModel + .getDatabaseName + }.${ secondaryIndexModel.secondaryIndex.indexName } SET + |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin).collect() + } try { if (!isCompactionCall) { SegmentStatusManager