This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aa5973  Fixed issues for Add Segment
9aa5973 is described below

commit 9aa597352aae2dd90fd8fc14faec3f2f73c9bf57
Author: manishnalla1994 <manish.nalla1...@gmail.com>
AuthorDate: Wed Oct 30 11:23:01 2019 +0530

    Fixed issues for Add Segment
    
    Issue1 : When the format is given in uppercase, add segment fails with
    unknown format.
    Solution1 : Made format case-insensitive.
    
    Issue2 : The same path is being added repeatedly, blocked this
    operation.
    
    Issue3 : Added validation for the folder not containing carbon files.
    
    This closes #3426
---
 .../core/datamap/DistributableDataMapFormat.java   |  1 +
 .../carbondata/core/metadata/SegmentFileStore.java | 24 +++++++++++++++-------
 .../carbondata/core/statusmanager/FileFormat.java  |  4 ++--
 .../testsuite/addsegment/AddSegmentTestCase.scala  |  5 ++---
 .../command/management/CarbonAddLoadCommand.scala  | 20 ++++++++++++++++--
 .../execution/strategy/MixedFormatHandler.scala    |  2 +-
 .../carbondata/TestStreamingTableOpName.scala      |  6 +++---
 .../TestStreamingTableWithRowParser.scala          |  6 +++---
 8 files changed, 47 insertions(+), 21 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 5af68b9..a4d02cb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -225,6 +225,7 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
     if (partitions == null) {
       out.writeBoolean(false);
     } else {
+      out.writeBoolean(true);
       out.writeInt(partitions.size());
       for (PartitionSpec partitionSpec : partitions) {
         partitionSpec.write(out);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 511687e..b03dbf4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -184,6 +184,22 @@ public class SegmentFileStore {
     return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath);
   }
 
+  /**
+   * Returns the list of index files
+   *
+   * @param segmentPath
+   * @return
+   */
+  public static CarbonFile[] getListOfCarbonIndexFiles(String segmentPath) {
+    CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
+    CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+            file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+      }
+    });
+    return indexFiles;
+  }
 
   /**
    * Write segment file to the metadata folder of the table.
@@ -195,13 +211,7 @@ public class SegmentFileStore {
   public static boolean writeSegmentFile(CarbonTable carbonTable, Segment 
segment)
       throws IOException {
     String tablePath = carbonTable.getTablePath();
-    CarbonFile segmentFolder = 
FileFactory.getCarbonFile(segment.getSegmentPath());
-    CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || 
file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
-      }
-    });
+    CarbonFile[] indexFiles = 
getListOfCarbonIndexFiles(segment.getSegmentPath());
     if (indexFiles != null && indexFiles.length > 0) {
       SegmentFile segmentFile = new SegmentFile();
       segmentFile.setOptions(segment.getOptions());
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index 4b79eb6..6f098e2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -33,11 +33,11 @@ public class FileFormat implements Serializable {
   private int ordinal;
 
   public FileFormat(String format) {
-    this.format = format;
+    this.format = format.toLowerCase();
   }
 
   public FileFormat(String format, int ordinal) {
-    this.format = format;
+    this.format = format.toLowerCase();
     this.ordinal = ordinal;
   }
 
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index e39272a..8da73e2 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.row.CarbonRow
@@ -31,9 +32,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
 import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.junit.Assert
-
 import scala.io.Source
 
 class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
@@ -542,7 +541,7 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
 
-    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='parquet')").show()
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='PARQUET')").show()
     checkExistence(sql(s"show segments for table addsegment1"), true, 
"spark-common/target/warehouse/addsegtest")
     checkExistence(sql(s"show history segments for table addsegment1"), true, 
"spark-common/target/warehouse/addsegtest")
     FileFactory.deleteAllFilesOfDir(new File(newPath))
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 34d22a7..7b2c088 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -92,6 +92,24 @@ case class CarbonAddLoadCommand(
     val segmentPath = options.getOrElse(
       "path", throw new UnsupportedOperationException("PATH is manadatory"))
 
+    val allSegments = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+
+    // If a path is already added then we should block the adding of the same 
path again.
+    if (allSegments.exists(a =>
+      a.getPath != null && a.getPath.equalsIgnoreCase(segmentPath)
+    )) {
+      throw new AnalysisException(s"path already exists in table status file, 
can not add same " +
+                                  s"segment path repeatedly: $segmentPath")
+    }
+
+    val format = options.getOrElse("format", "carbondata")
+    val isCarbonFormat = format.equalsIgnoreCase("carbondata") || 
format.equalsIgnoreCase("carbon")
+
+    // If in the given location no carbon index files are found then we should 
throw an exception
+    if (isCarbonFormat && 
SegmentFileStore.getListOfCarbonIndexFiles(segmentPath).isEmpty) {
+      throw new AnalysisException("CarbonIndex files not present in the 
location")
+    }
+
     val segSchema = MixedFormatHandler.getSchema(sparkSession, options, 
segmentPath)
 
     val segCarbonSchema = new Schema(segSchema.fields.map { field =>
@@ -142,8 +160,6 @@ case class CarbonAddLoadCommand(
       model.getFactTimeStamp,
       false)
     newLoadMetaEntry.setPath(segmentPath)
-    val format = options.getOrElse("format", "carbondata")
-    val isCarbonFormat = format.equals("carbondata") || format.equals("carbon")
     if (!isCarbonFormat) {
       newLoadMetaEntry.setFileFormat(new FileFormat(format))
     }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index a54645d..26c0fb0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -60,7 +60,7 @@ object MixedFormatHandler {
       options: Map[String, String],
       segPath: String): StructType = {
     val format = options.getOrElse("format", "carbondata")
-    if ((format.equals("carbondata") || format.equals("carbon"))) {
+    if ((format.equalsIgnoreCase("carbondata") || 
format.equalsIgnoreCase("carbon"))) {
       new SparkCarbonFileFormat().inferSchema(sparkSession, options, 
Seq.empty).get
     } else {
       val filePath = FileFactory.addSchemeIfNotExists(segPath.replace("\\", 
"/"))
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index f57c593..ade3567 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -1061,13 +1061,13 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
     result1.foreach { row =>
       if (row.getString(0).equals("1")) {
         assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
-        assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase)
       } else if (row.getString(0).equals("0.1")) {
         assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
-        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
       } else {
         assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
-        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
       }
     }
 
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 71d94b7..1672e75 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -397,13 +397,13 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
     result1.foreach { row =>
       if (row.getString(0).equals("1")) {
         assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
-        assertResult(FileFormat.ROW_V1.toString)(row.getString(5))
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase)
       } else if (row.getString(0).equals("0.1")) {
         assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
-        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
       } else {
         assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
-        assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5))
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
       }
     }
 

Reply via email to