This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf65269  [HUDI-1230] Fix for preventing MOR datasource jobs from 
hanging via spark-submit (#2046)
bf65269 is described below

commit bf65269f66075069b421dfa25f90b105bc4ec662
Author: Udit Mehrotra <udit.mehrotr...@gmail.com>
AuthorDate: Thu Sep 17 20:03:35 2020 -0700

    [HUDI-1230] Fix for preventing MOR datasource jobs from hanging via 
spark-submit (#2046)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  10 +-
 .../functional/HoodieSparkSqlWriterSuite.scala     | 138 ++++++++++++++++-----
 2 files changed, 110 insertions(+), 38 deletions(-)

diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 569ed34..450bd73 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -52,6 +52,7 @@ private[hudi] object HoodieSparkSqlWriter {
 
   private val log = LogManager.getLogger(getClass)
   private var tableExists: Boolean = false
+  private var asyncCompactionTriggerFnDefined: Boolean = false
 
   def write(sqlContext: SQLContext,
             mode: SaveMode,
@@ -67,6 +68,7 @@ private[hudi] object HoodieSparkSqlWriter {
     val sparkContext = sqlContext.sparkContext
     val path = parameters.get("path")
     val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME)
+    asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined
     if (path.isEmpty || tblNameOp.isEmpty) {
       throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' 
must be set.")
     }
@@ -147,8 +149,7 @@ private[hudi] object HoodieSparkSqlWriter {
             tblName, mapAsJavaMap(parameters)
           )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
+          if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
             asyncCompactionTriggerFn.get.apply(client)
           }
 
@@ -187,8 +188,7 @@ private[hudi] object HoodieSparkSqlWriter {
             Schema.create(Schema.Type.NULL).toString, path.get, tblName,
             
mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
+          if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
             asyncCompactionTriggerFn.get.apply(client)
           }
 
@@ -441,7 +441,7 @@ private[hudi] object HoodieSparkSqlWriter {
                                        tableConfig: HoodieTableConfig,
                                        parameters: Map[String, String], 
configuration: Configuration) : Boolean = {
     log.info(s"Config.isInlineCompaction ? 
${client.getConfig.isInlineCompaction}")
-    if (!client.getConfig.isInlineCompaction
+    if (asyncCompactionTriggerFnDefined && !client.getConfig.isInlineCompaction
       && parameters.get(ASYNC_COMPACT_ENABLE_OPT_KEY).exists(r => 
r.toBoolean)) {
       tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
     } else {
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 8995b7c..bcd83db 100644
--- 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -22,17 +22,29 @@ import java.util.{Date, UUID}
 
 import org.apache.commons.io.FileUtils
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.client.HoodieWriteClient
+import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.keygen.SimpleKeyGenerator
 import org.apache.hudi.testutils.DataSourceTestUtils
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkSqlWriter, HoodieWriterUtils}
-import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, 
DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession}
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{spy, times, verify}
 import org.scalatest.{FunSuite, Matchers}
 
+import scala.collection.JavaConversions._
+
 class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
 
+  var spark: SparkSession = _
+  var sc: SparkContext = _
+  var sqlContext: SQLContext = _
+
   test("Parameters With Write Defaults") {
     val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty)
     val rhsKey = "hoodie.right.hand.side.key"
@@ -65,15 +77,10 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
 
   test("throw hoodie exception when there already exist a table with different 
name with Append Save mode") {
 
-    val session = SparkSession.builder()
-      .appName("test_append_mode")
-      .master("local[2]")
-      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .getOrCreate()
+    initSparkContext("test_append_mode")
     val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
     try {
 
-      val sqlContext = session.sqlContext
       val hoodieFooTableName = "hoodie_foo_tbl"
 
       //create a new table
@@ -82,7 +89,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
         "hoodie.insert.shuffle.parallelism" -> "4",
         "hoodie.upsert.shuffle.parallelism" -> "4")
       val fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
-      val dataFrame = 
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new 
Date().getTime)))
+      val dataFrame = 
spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
       HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, 
dataFrame)
 
       //on same path try append with different("hoodie_bar_tbl") table name 
which should throw an exception
@@ -91,7 +98,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
         "hoodie.insert.shuffle.parallelism" -> "4",
         "hoodie.upsert.shuffle.parallelism" -> "4")
       val barTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier)
-      val dataFrame2 = 
session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new 
Date().getTime)))
+      val dataFrame2 = 
spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime)))
       val tableAlreadyExistException = 
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.Append, barTableParams, dataFrame2))
       assert(tableAlreadyExistException.getMessage.contains("hoodie table with 
name " + hoodieFooTableName + " already exist"))
 
@@ -100,22 +107,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       val deleteCmdException = 
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.Append, deleteTableParams, dataFrame2))
       assert(deleteCmdException.getMessage.contains("hoodie table with name " 
+ hoodieFooTableName + " already exist"))
     } finally {
-      session.stop()
+      spark.stop()
       FileUtils.deleteDirectory(path.toFile)
     }
   }
 
   test("test bulk insert dataset with datasource impl") {
-    val session = SparkSession.builder()
-      .appName("test_bulk_insert_datasource")
-      .master("local[2]")
-      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .getOrCreate()
+    initSparkContext("test_bulk_insert_datasource")
     val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
     try {
 
-      val sqlContext = session.sqlContext
-      val sc = session.sparkContext
       val hoodieFooTableName = "hoodie_foo_tbl"
 
       //create a new table
@@ -134,7 +135,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
       val records = DataSourceTestUtils.generateRandomRows(100)
       val recordsSeq = convertRowListToSeq(records)
-      val df = session.createDataFrame(sc.parallelize(recordsSeq), structType)
+      val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
       // write to Hudi
       HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, 
df)
 
@@ -148,7 +149,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
 
       // fetch all records from parquet files generated from write to hudi
-      val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), 
fullPartitionPaths(1), fullPartitionPaths(2))
+      val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), 
fullPartitionPaths(1), fullPartitionPaths(2))
 
       // remove metadata columns so that expected and actual DFs can be 
compared as is
       val trimmedDf = 
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
@@ -157,22 +158,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
 
       assert(df.except(trimmedDf).count() == 0)
     } finally {
-      session.stop()
+      spark.stop()
       FileUtils.deleteDirectory(path.toFile)
     }
   }
 
   test("test bulk insert dataset with datasource impl multiple rounds") {
-    val session = SparkSession.builder()
-      .appName("test_bulk_insert_datasource")
-      .master("local[2]")
-      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .getOrCreate()
+    initSparkContext("test_bulk_insert_datasource")
     val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
     try {
 
-      val sqlContext = session.sqlContext
-      val sc = session.sparkContext
       val hoodieFooTableName = "hoodie_foo_tbl"
 
       //create a new table
@@ -194,18 +189,18 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
 
       val schema = DataSourceTestUtils.getStructTypeExampleSchema
       val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      var totalExpectedDf = session.createDataFrame(sc.emptyRDD[Row], 
structType)
+      var totalExpectedDf = spark.createDataFrame(sc.emptyRDD[Row], structType)
 
       for (_ <- 0 to 2) {
         // generate the inserts
         val records = DataSourceTestUtils.generateRandomRows(200)
         val recordsSeq = convertRowListToSeq(records)
-        val df = session.createDataFrame(sc.parallelize(recordsSeq), 
structType)
+        val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
         // write to Hudi
         HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
fooTableParams, df)
 
         // Fetch records from entire dataset
-        val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), 
fullPartitionPaths(1), fullPartitionPaths(2))
+        val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), 
fullPartitionPaths(1), fullPartitionPaths(2))
 
         // remove metadata columns so that expected and actual DFs can be 
compared as is
         val trimmedDf = 
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
@@ -218,11 +213,78 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
         assert(totalExpectedDf.except(trimmedDf).count() == 0)
       }
     } finally {
-      session.stop()
+      spark.stop()
       FileUtils.deleteDirectory(path.toFile)
     }
   }
 
+  List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+    .foreach(tableType => {
+      test("test basic HoodieSparkSqlWriter functionality with datasource 
insert for " + tableType) {
+        initSparkContext("test_insert_datasource")
+        val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+        try {
+
+          val hoodieFooTableName = "hoodie_foo_tbl"
+
+          //create a new table
+          val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+            HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+            DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
+            HoodieWriteConfig.INSERT_PARALLELISM -> "4",
+            DataSourceWriteOptions.OPERATION_OPT_KEY -> 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+            DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+            DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+            DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[SimpleKeyGenerator].getCanonicalName)
+          val fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+          // generate the inserts
+          val schema = DataSourceTestUtils.getStructTypeExampleSchema
+          val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+          val records = DataSourceTestUtils.generateRandomRows(100)
+          val recordsSeq = convertRowListToSeq(records)
+          val df = spark.createDataFrame(sc.parallelize(recordsSeq), 
structType)
+
+          val client = spy(DataSourceUtils.createHoodieClient(
+            new JavaSparkContext(sc),
+            schema.toString,
+            path.toAbsolutePath.toString,
+            hoodieFooTableName,
+            
mapAsJavaMap(fooTableParams)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]])
+
+          // write to Hudi
+          HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
fooTableParams, df, Option.empty,
+            Option(client))
+          // Verify that asynchronous compaction is not scheduled
+          verify(client, times(0)).scheduleCompaction(any())
+          // Verify that HoodieWriteClient is closed correctly
+          verify(client, times(1)).close()
+
+          // collect all partition paths to issue read of parquet files
+          val partitions = 
Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+            HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, 
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
+          // Check the entire dataset has all records still
+          val fullPartitionPaths = new Array[String](3)
+          for (i <- fullPartitionPaths.indices) {
+            fullPartitionPaths(i) = String.format("%s/%s/*", 
path.toAbsolutePath.toString, partitions(i))
+          }
+
+          // fetch all records from parquet files generated from write to hudi
+          val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), 
fullPartitionPaths(1), fullPartitionPaths(2))
+
+          // remove metadata columns so that expected and actual DFs can be 
compared as is
+          val trimmedDf = 
actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+            
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+            .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+          assert(df.except(trimmedDf).count() == 0)
+        } finally {
+          spark.stop()
+          FileUtils.deleteDirectory(path.toFile)
+        }
+      }
+    })
+
   case class Test(uuid: String, ts: Long)
 
   import scala.collection.JavaConverters
@@ -230,4 +292,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
   def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] =
     JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
 
+  def initSparkContext(appName: String): Unit = {
+    spark = SparkSession.builder()
+      .appName(appName)
+      .master("local[2]")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate()
+    sc = spark.sparkContext
+    sc.setLogLevel("ERROR")
+    sqlContext = spark.sqlContext
+  }
 }

Reply via email to