This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new bf65269 [HUDI-1230] Fix for preventing MOR datasource jobs from hanging via spark-submit (#2046) bf65269 is described below commit bf65269f66075069b421dfa25f90b105bc4ec662 Author: Udit Mehrotra <udit.mehrotr...@gmail.com> AuthorDate: Thu Sep 17 20:03:35 2020 -0700 [HUDI-1230] Fix for preventing MOR datasource jobs from hanging via spark-submit (#2046) --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 10 +- .../functional/HoodieSparkSqlWriterSuite.scala | 138 ++++++++++++++++----- 2 files changed, 110 insertions(+), 38 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 569ed34..450bd73 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -52,6 +52,7 @@ private[hudi] object HoodieSparkSqlWriter { private val log = LogManager.getLogger(getClass) private var tableExists: Boolean = false + private var asyncCompactionTriggerFnDefined: Boolean = false def write(sqlContext: SQLContext, mode: SaveMode, @@ -67,6 +68,7 @@ private[hudi] object HoodieSparkSqlWriter { val sparkContext = sqlContext.sparkContext val path = parameters.get("path") val tblNameOp = parameters.get(HoodieWriteConfig.TABLE_NAME) + asyncCompactionTriggerFnDefined = asyncCompactionTriggerFn.isDefined if (path.isEmpty || tblNameOp.isEmpty) { throw new HoodieException(s"'${HoodieWriteConfig.TABLE_NAME}', 'path' must be set.") } @@ -147,8 +149,7 @@ private[hudi] object HoodieSparkSqlWriter { tblName, mapAsJavaMap(parameters) )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]] - if (asyncCompactionTriggerFn.isDefined && - isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { + if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) } @@ -187,8 +188,7 @@ private[hudi] object HoodieSparkSqlWriter { Schema.create(Schema.Type.NULL).toString, path.get, tblName, mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]] - if (asyncCompactionTriggerFn.isDefined && - isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { + if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) } @@ -441,7 +441,7 @@ private[hudi] object HoodieSparkSqlWriter { tableConfig: HoodieTableConfig, parameters: Map[String, String], configuration: Configuration) : Boolean = { log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}") - if (!client.getConfig.isInlineCompaction + if (asyncCompactionTriggerFnDefined && !client.getConfig.isInlineCompaction && parameters.get(ASYNC_COMPACT_ENABLE_OPT_KEY).exists(r => r.toBoolean)) { tableConfig.getTableType == HoodieTableType.MERGE_ON_READ } else { diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 8995b7c..bcd83db 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -22,17 +22,29 @@ import java.util.{Date, UUID} import org.apache.commons.io.FileUtils import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.client.HoodieWriteClient +import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException +import org.apache.hudi.keygen.SimpleKeyGenerator import org.apache.hudi.testutils.DataSourceTestUtils -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} -import org.apache.spark.sql.{Row, SaveMode, SparkSession} +import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{spy, times, verify} import org.scalatest.{FunSuite, Matchers} +import scala.collection.JavaConversions._ + class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { + var spark: SparkSession = _ + var sc: SparkContext = _ + var sqlContext: SQLContext = _ + test("Parameters With Write Defaults") { val originals = HoodieWriterUtils.parametersWithWriteDefaults(Map.empty) val rhsKey = "hoodie.right.hand.side.key" @@ -65,15 +77,10 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { test("throw hoodie exception when there already exist a table with different name with Append Save mode") { - val session = SparkSession.builder() - .appName("test_append_mode") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate() + initSparkContext("test_append_mode") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val sqlContext = session.sqlContext val hoodieFooTableName = "hoodie_foo_tbl" //create a new table @@ -82,7 +89,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4") val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) - val dataFrame = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime))) + val dataFrame = spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime))) HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame) //on same path try append with different("hoodie_bar_tbl") table name which should throw an exception @@ -91,7 +98,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4") val barTableParams = HoodieWriterUtils.parametersWithWriteDefaults(barTableModifier) - val dataFrame2 = session.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime))) + val dataFrame2 = spark.createDataFrame(Seq(Test(UUID.randomUUID().toString, new Date().getTime))) val tableAlreadyExistException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, barTableParams, dataFrame2)) assert(tableAlreadyExistException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) @@ -100,22 +107,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val deleteCmdException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, deleteTableParams, dataFrame2)) assert(deleteCmdException.getMessage.contains("hoodie table with name " + hoodieFooTableName + " already exist")) } finally { - session.stop() + spark.stop() FileUtils.deleteDirectory(path.toFile) } } test("test bulk insert dataset with datasource impl") { - val session = SparkSession.builder() - .appName("test_bulk_insert_datasource") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate() + initSparkContext("test_bulk_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val sqlContext = session.sqlContext - val sc = session.sparkContext val hoodieFooTableName = "hoodie_foo_tbl" //create a new table @@ -134,7 +135,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) val records = DataSourceTestUtils.generateRandomRows(100) val recordsSeq = convertRowListToSeq(records) - val df = session.createDataFrame(sc.parallelize(recordsSeq), structType) + val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) @@ -148,7 +149,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } // fetch all records from parquet files generated from write to hudi - val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) // remove metadata columns so that expected and actual DFs can be compared as is val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) @@ -157,22 +158,16 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assert(df.except(trimmedDf).count() == 0) } finally { - session.stop() + spark.stop() FileUtils.deleteDirectory(path.toFile) } } test("test bulk insert dataset with datasource impl multiple rounds") { - val session = SparkSession.builder() - .appName("test_bulk_insert_datasource") - .master("local[2]") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .getOrCreate() + initSparkContext("test_bulk_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") try { - val sqlContext = session.sqlContext - val sc = session.sparkContext val hoodieFooTableName = "hoodie_foo_tbl" //create a new table @@ -194,18 +189,18 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) - var totalExpectedDf = session.createDataFrame(sc.emptyRDD[Row], structType) + var totalExpectedDf = spark.createDataFrame(sc.emptyRDD[Row], structType) for (_ <- 0 to 2) { // generate the inserts val records = DataSourceTestUtils.generateRandomRows(200) val recordsSeq = convertRowListToSeq(records) - val df = session.createDataFrame(sc.parallelize(recordsSeq), structType) + val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) // write to Hudi HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df) // Fetch records from entire dataset - val actualDf = session.sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) // remove metadata columns so that expected and actual DFs can be compared as is val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) @@ -218,11 +213,78 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { assert(totalExpectedDf.except(trimmedDf).count() == 0) } } finally { - session.stop() + spark.stop() FileUtils.deleteDirectory(path.toFile) } } + List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .foreach(tableType => { + test("test basic HoodieSparkSqlWriter functionality with datasource insert for " + tableType) { + initSparkContext("test_insert_datasource") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + try { + + val hoodieFooTableName = "hoodie_foo_tbl" + + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType, + HoodieWriteConfig.INSERT_PARALLELISM -> "4", + DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[SimpleKeyGenerator].getCanonicalName) + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + + // generate the inserts + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val records = DataSourceTestUtils.generateRandomRows(100) + val recordsSeq = convertRowListToSeq(records) + val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + + val client = spy(DataSourceUtils.createHoodieClient( + new JavaSparkContext(sc), + schema.toString, + path.toAbsolutePath.toString, + hoodieFooTableName, + mapAsJavaMap(fooTableParams)).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]) + + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df, Option.empty, + Option(client)) + // Verify that asynchronous compaction is not scheduled + verify(client, times(0)).scheduleCompaction(any()) + // Verify that HoodieWriteClient is closed correctly + verify(client, times(1)).close() + + // collect all partition paths to issue read of parquet files + val partitions = Seq(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH) + // Check the entire dataset has all records still + val fullPartitionPaths = new Array[String](3) + for (i <- fullPartitionPaths.indices) { + fullPartitionPaths(i) = String.format("%s/%s/*", path.toAbsolutePath.toString, partitions(i)) + } + + // fetch all records from parquet files generated from write to hudi + val actualDf = sqlContext.read.parquet(fullPartitionPaths(0), fullPartitionPaths(1), fullPartitionPaths(2)) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf = actualDf.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + assert(df.except(trimmedDf).count() == 0) + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + }) + case class Test(uuid: String, ts: Long) import scala.collection.JavaConverters @@ -230,4 +292,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { def convertRowListToSeq(inputList: util.List[Row]): Seq[Row] = JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq + def initSparkContext(appName: String): Unit = { + spark = SparkSession.builder() + .appName(appName) + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + sc = spark.sparkContext + sc.setLogLevel("ERROR") + sqlContext = spark.sqlContext + } }