This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 9aa5973 Fixed issues for Add Segment 9aa5973 is described below commit 9aa597352aae2dd90fd8fc14faec3f2f73c9bf57 Author: manishnalla1994 <manish.nalla1...@gmail.com> AuthorDate: Wed Oct 30 11:23:01 2019 +0530 Fixed issues for Add Segment Issue1 : When the format is given in uppercase, add segment fails with unknown format. Solution1 : Made format case-insensitive. Issue2 : The same path is being added repeatedly, blocked this operation. Issue3 : Added validation for the folder not containing carbon files. This closes #3426 --- .../core/datamap/DistributableDataMapFormat.java | 1 + .../carbondata/core/metadata/SegmentFileStore.java | 24 +++++++++++++++------- .../carbondata/core/statusmanager/FileFormat.java | 4 ++-- .../testsuite/addsegment/AddSegmentTestCase.scala | 5 ++--- .../command/management/CarbonAddLoadCommand.scala | 20 ++++++++++++++++-- .../execution/strategy/MixedFormatHandler.scala | 2 +- .../carbondata/TestStreamingTableOpName.scala | 6 +++--- .../TestStreamingTableWithRowParser.scala | 6 +++--- 8 files changed, 47 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java index 5af68b9..a4d02cb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java @@ -225,6 +225,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl if (partitions == null) { out.writeBoolean(false); } else { + out.writeBoolean(true); out.writeInt(partitions.size()); for (PartitionSpec partitionSpec : partitions) { partitionSpec.write(out); diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 511687e..b03dbf4 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -184,6 +184,22 @@ public class SegmentFileStore { return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath); } + /** + * Returns the list of index files + * + * @param segmentPath + * @return + */ + public static CarbonFile[] getListOfCarbonIndexFiles(String segmentPath) { + CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath); + CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || + file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)); + } + }); + return indexFiles; + } /** * Write segment file to the metadata folder of the table. @@ -195,13 +211,7 @@ public class SegmentFileStore { public static boolean writeSegmentFile(CarbonTable carbonTable, Segment segment) throws IOException { String tablePath = carbonTable.getTablePath(); - CarbonFile segmentFolder = FileFactory.getCarbonFile(segment.getSegmentPath()); - CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() - .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)); - } - }); + CarbonFile[] indexFiles = getListOfCarbonIndexFiles(segment.getSegmentPath()); if (indexFiles != null && indexFiles.length > 0) { SegmentFile segmentFile = new SegmentFile(); segmentFile.setOptions(segment.getOptions()); diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java index 4b79eb6..6f098e2 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java @@ -33,11 +33,11 @@ public class FileFormat implements Serializable { private int ordinal; public FileFormat(String format) { - this.format = format; + this.format = format.toLowerCase(); } public FileFormat(String format, int ordinal) { - this.format = format; + this.format = format.toLowerCase(); this.ordinal = ordinal; } diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala index e39272a..8da73e2 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.sql.{CarbonEnv, DataFrame, Row} import org.scalatest.BeforeAndAfterAll + import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.row.CarbonRow @@ -31,9 +32,7 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter} -import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.junit.Assert - import scala.io.Source class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll { @@ -542,7 +541,7 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll { copy(path.toString, newPath) checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30))) - sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='parquet')").show() + sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='PARQUET')").show() checkExistence(sql(s"show segments for table addsegment1"), true, "spark-common/target/warehouse/addsegtest") checkExistence(sql(s"show history segments for table addsegment1"), true, "spark-common/target/warehouse/addsegtest") FileFactory.deleteAllFilesOfDir(new File(newPath)) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala index 34d22a7..7b2c088 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala @@ -92,6 +92,24 @@ case class CarbonAddLoadCommand( val segmentPath = options.getOrElse( "path", throw new UnsupportedOperationException("PATH is manadatory")) + val allSegments = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + + // If a path is already added then we should block the adding of the same path again. + if (allSegments.exists(a => + a.getPath != null && a.getPath.equalsIgnoreCase(segmentPath) + )) { + throw new AnalysisException(s"path already exists in table status file, can not add same " + + s"segment path repeatedly: $segmentPath") + } + + val format = options.getOrElse("format", "carbondata") + val isCarbonFormat = format.equalsIgnoreCase("carbondata") || format.equalsIgnoreCase("carbon") + + // If in the given location no carbon index files are found then we should throw an exception + if (isCarbonFormat && SegmentFileStore.getListOfCarbonIndexFiles(segmentPath).isEmpty) { + throw new AnalysisException("CarbonIndex files not present in the location") + } + val segSchema = MixedFormatHandler.getSchema(sparkSession, options, segmentPath) val segCarbonSchema = new Schema(segSchema.fields.map { field => @@ -142,8 +160,6 @@ case class CarbonAddLoadCommand( model.getFactTimeStamp, false) newLoadMetaEntry.setPath(segmentPath) - val format = options.getOrElse("format", "carbondata") - val isCarbonFormat = format.equals("carbondata") || format.equals("carbon") if (!isCarbonFormat) { newLoadMetaEntry.setFileFormat(new FileFormat(format)) } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala index a54645d..26c0fb0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala @@ -60,7 +60,7 @@ object MixedFormatHandler { options: Map[String, String], segPath: String): StructType = { val format = options.getOrElse("format", "carbondata") - if ((format.equals("carbondata") || format.equals("carbon"))) { + if ((format.equalsIgnoreCase("carbondata") || format.equalsIgnoreCase("carbon"))) { new SparkCarbonFileFormat().inferSchema(sparkSession, options, Seq.empty).get } else { val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\", "/")) diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala index f57c593..ade3567 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala @@ -1061,13 +1061,13 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll { result1.foreach { row => if (row.getString(0).equals("1")) { assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) - assertResult(FileFormat.ROW_V1.toString)(row.getString(5)) + assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase) } else if (row.getString(0).equals("0.1")) { assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) } else { assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) } } diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index 71d94b7..1672e75 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -397,13 +397,13 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { result1.foreach { row => if (row.getString(0).equals("1")) { assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) - assertResult(FileFormat.ROW_V1.toString)(row.getString(5)) + assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase) } else if (row.getString(0).equals("0.1")) { assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) } else { assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) } }