Repository: carbondata
Updated Branches:
  refs/heads/branch-1.5 f863cee9a -> ff7569a93


[CARBONDATA-3070] Fix partition load issue when custom location is added.

Problem:
Load files from carbonfile format when custom partition location is added

Reason:
Carbon has its own filename for each carbondata file, it does not use the 
filename proposed by spark.
And also it has extra index file need to be created. In case of custom 
partition location sparks keep track the files
of name which creates and move them. But carbon has different files created and 
maintained, that creates the filenot found exception.

Solution:
Use custom protocol to manage commit and folder location for custom partition 
location.

This closes #2873


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f947efe4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f947efe4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f947efe4

Branch: refs/heads/branch-1.5
Commit: f947efe4ecdebdea92ce6b05777337cf2fd50107
Parents: b9720d3
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Mon Oct 29 13:15:00 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Wed Nov 21 22:37:26 2018 +0530

----------------------------------------------------------------------
 .../datasources/SparkCarbonFileFormat.scala     | 87 +++++++++++++++++++-
 .../org/apache/spark/sql/CarbonVectorProxy.java |  3 +
 .../datasource/SparkCarbonDataSourceTest.scala  | 34 ++++++++
 3 files changed, 120 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f947efe4/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index cd2035c..8c2f200 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.carbondata.execution.datasources
 
+import java.net.URI
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
@@ -27,6 +29,7 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql._
 import 
org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
@@ -112,6 +115,13 @@ class SparkCarbonFileFormat extends FileFormat
   }
 
   /**
+   * Add our own protocol to control the commit.
+   */
+  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
+    "spark.sql.sources.commitProtocolClass",
+    
"org.apache.spark.sql.carbondata.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
+  /**
    * Prepares a write job and returns an [[OutputWriterFactory]].  Client side 
job preparation is
    * done here.
    */
@@ -125,6 +135,7 @@ class SparkCarbonFileFormat extends FileFormat
     val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
     model.setLoadWithoutConverterStep(true)
     CarbonTableOutputFormat.setLoadModel(conf, model)
+    conf.set(CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL, "true")
 
     new OutputWriterFactory {
       override def newInstance(
@@ -310,7 +321,6 @@ class SparkCarbonFileFormat extends FileFormat
     vectorizedReader.toBoolean && 
schema.forall(_.dataType.isInstanceOf[AtomicType])
   }
 
-
   /**
    * Returns whether this format support returning columnar batch or not.
    */
@@ -369,7 +379,7 @@ class SparkCarbonFileFormat extends FileFormat
 
       if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
         val split = new CarbonInputSplit("null",
-          new Path(file.filePath),
+          new Path(new URI(file.filePath)),
           file.start,
           file.length,
           file.locations,
@@ -380,10 +390,12 @@ class SparkCarbonFileFormat extends FileFormat
         split.setDetailInfo(info)
         info.setBlockSize(file.length)
         // Read the footer offset and set.
-        val reader = 
FileFactory.getFileHolder(FileFactory.getFileType(file.filePath),
+        val reader = 
FileFactory.getFileHolder(FileFactory.getFileType(split.getPath.toString),
           broadcastedHadoopConf.value.value)
         val buffer = reader
-          .readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), 
file.length - 8, 8)
+          
.readByteBuffer(FileFactory.getUpdatedFilePath(split.getPath.toString),
+            file.length - 8,
+            8)
         info.setBlockFooterOffset(buffer.getLong)
         info.setVersionNumber(split.getVersion.number())
         info.setUseMinMaxForPruning(true)
@@ -447,7 +459,74 @@ class SparkCarbonFileFormat extends FileFormat
     }
   }
 
+}
+
+/**
+ * Since carbon writes 2 files carbondata files and index file , but spark 
cannot understand two
+ * files so added custom protocol to copy the files in case of custom 
partition location.
+ */
+case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, 
isAppend: Boolean)
+  extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) {
+
+  override def newTaskTempFileAbsPath(
+      taskContext: TaskAttemptContext, absoluteDir: String, ext: String): 
String = {
+    val carbonFlow = taskContext.getConfiguration.get(
+      CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL)
+    val tempPath = super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+    // Call only in case of carbon flow.
+    if (carbonFlow != null) {
+      // Create subfolder with uuid and write carbondata files
+      val path = new Path(tempPath)
+      val uuid = path.getName.substring(0, path.getName.indexOf("-part-"))
+      new Path(new Path(path.getParent, uuid), path.getName).toString
+    } else {
+      tempPath
+    }
+  }
 
+  override def commitJob(jobContext: JobContext,
+      taskCommits: Seq[FileCommitProtocol.TaskCommitMessage]): Unit = {
+    val carbonFlow = jobContext.getConfiguration.get(
+      CarbonSQLHadoopMapReduceCommitProtocol.COMMIT_PROTOCOL)
+    var updatedTaskCommits = taskCommits
+    // Call only in case of carbon flow.
+    if (carbonFlow != null) {
+      val (allAbsPathFiles, allPartitionPaths) =
+        // spark 2.1 and 2.2 case
+        if (taskCommits.exists(_.obj.isInstanceOf[Map[String, String]])) {
+        (taskCommits.map(_.obj.asInstanceOf[Map[String, String]]), null)
+      } else {
+          // spark 2.3 and above
+        taskCommits.map(_.obj.asInstanceOf[(Map[String, String], 
Set[String])]).unzip
+      }
+      val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
+      val fs = new Path(path).getFileSystem(jobContext.getConfiguration)
+      // Move files from stage directory to actual location.
+      filesToMove.foreach{case (src, dest) =>
+        val srcPath = new Path(src)
+        val name = srcPath.getName
+        // Get uuid from spark's stage filename
+        val uuid = name.substring(0, name.indexOf("-part-"))
+        // List all the files under the uuid location
+        val list = fs.listStatus(new Path(new Path(src).getParent, uuid))
+        // Move all these files to actual folder.
+        list.foreach{ f =>
+          fs.rename(f.getPath, new Path(new Path(dest).getParent, 
f.getPath.getName))
+        }
+      }
+      updatedTaskCommits = if (allPartitionPaths == null) {
+        taskCommits.map(f => new 
FileCommitProtocol.TaskCommitMessage(Map.empty))
+      } else {
+        taskCommits.zipWithIndex.map{f =>
+          new FileCommitProtocol.TaskCommitMessage((Map.empty, 
allPartitionPaths(f._2)))
+        }
+      }
+    }
+    super.commitJob(jobContext, updatedTaskCommits)
+  }
+}
+object CarbonSQLHadoopMapReduceCommitProtocol {
+  val COMMIT_PROTOCOL = "carbon.commit.protocol"
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f947efe4/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
 
b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index c16d381..90e2cc5 100644
--- 
a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ 
b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -464,6 +464,9 @@ public class CarbonVectorProxy {
     }
 
     public void reset() {
+      if (isConstant) {
+        return;
+      }
       isLoaded = false;
       vector.reset();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f947efe4/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
 
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index a1a5b8e..7564158 100644
--- 
a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ 
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -1276,6 +1276,40 @@ class SparkCarbonDataSourceTest extends FunSuite with 
BeforeAndAfterAll {
     }
   }
 
+  test("test partition issue with add location") {
+    spark.sql("drop table if exists partitionTable_obs")
+    spark.sql("drop table if exists partitionTable_obs_par")
+    spark.sql(s"create table partitionTable_obs (id int,name String,email 
String) using carbon partitioned by(email) ")
+    spark.sql(s"create table partitionTable_obs_par (id int,name String,email 
String) using parquet partitioned by(email) ")
+    spark.sql("insert into partitionTable_obs select 1,'huawei','abc'")
+    spark.sql("insert into partitionTable_obs select 1,'huawei','bcd'")
+    spark.sql(s"alter table partitionTable_obs add partition (email='def') 
location '$warehouse1/test_folder121/'")
+    spark.sql("insert into partitionTable_obs select 1,'huawei','def'")
+
+    spark.sql("insert into partitionTable_obs_par select 1,'huawei','abc'")
+    spark.sql("insert into partitionTable_obs_par select 1,'huawei','bcd'")
+    spark.sql(s"alter table partitionTable_obs_par add partition (email='def') 
location '$warehouse1/test_folder122/'")
+    spark.sql("insert into partitionTable_obs_par select 1,'huawei','def'")
+
+    checkAnswer(spark.sql("select * from partitionTable_obs"), 
spark.sql("select * from partitionTable_obs_par"))
+    spark.sql("drop table if exists partitionTable_obs")
+    spark.sql("drop table if exists partitionTable_obs_par")
+  }
+
+  test("test multiple partition  select issue") {
+    spark.sql("drop table if exists t_carbn01b_hive")
+    spark.sql(s"drop table if exists t_carbn01b")
+    spark.sql("create table t_carbn01b_hive(Qty_day_avg INT,Qty_total 
INT,Sell_price BIGINT,Sell_pricep DOUBLE,Profit DECIMAL(3,2),Item_code 
String,Item_name String,Outlet_name String,Create_date String,Active_status 
String,Item_type_cd INT, Update_time TIMESTAMP, Discount_price DOUBLE)  using 
parquet partitioned by (Active_status,Item_type_cd, Update_time, 
Discount_price)")
+    spark.sql("create table t_carbn01b(Qty_day_avg INT,Qty_total 
INT,Sell_price BIGINT,Sell_pricep DOUBLE,Profit DECIMAL(3,2),Item_code 
String,Item_name String,Outlet_name String,Create_date String,Active_status 
String,Item_type_cd INT, Update_time TIMESTAMP, Discount_price DOUBLE)  using 
carbon partitioned by (Active_status,Item_type_cd, Update_time, 
Discount_price)")
+    spark.sql("insert into t_carbn01b partition(Active_status, 
Item_type_cd,Update_time,Discount_price) select * from t_carbn01b_hive")
+    spark.sql("alter table t_carbn01b add partition 
(active_status='xyz',Item_type_cd=12,Update_time=NULL,Discount_price='3000')")
+    spark.sql("insert overwrite table t_carbn01b select 'xyz', 12, 
74,3000,20000000,121.5,4.99,2.44,'RE3423ee','dddd', 'ssss','2012-01-02 
23:04:05.12', '2012-01-20'")
+    spark.sql("insert overwrite table t_carbn01b_hive select 'xyz', 12, 
74,3000,20000000,121.5,4.99,2.44,'RE3423ee','dddd', 'ssss','2012-01-02 
23:04:05.12', '2012-01-20'")
+    checkAnswer(spark.sql("select * from t_carbn01b_hive"), spark.sql("select 
* from t_carbn01b"))
+    spark.sql("drop table if exists t_carbn01b_hive")
+    spark.sql(s"drop table if exists t_carbn01b")
+  }
+
   override protected def beforeAll(): Unit = {
     drop
     createParquetTable

Reply via email to