This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2b245aa7098 Fix race condition in HoodieSparkSqlWriter (#9749) 2b245aa7098 is described below commit 2b245aa709852f8023a703bd4d574b0491f2c96b Author: StreamingFlames <18889897...@163.com> AuthorDate: Sun Oct 22 05:24:16 2023 -0500 Fix race condition in HoodieSparkSqlWriter (#9749) --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 47 ++++++++++++++++++++-- .../TestSparkDataSourceDAGExecution.scala | 2 +- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 581cf2bdc69..49ee4c4b670 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -27,6 +27,7 @@ import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTable import org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} +import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE, SQL_MERGE_INTO_WRITES, StreamingWriteParams} import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.AvroSchemaUtils.{canProject, isCompatibleProjectionOf, isSchemaCompatible, resolveNullableSchema} import org.apache.hudi.avro.HoodieAvroUtils @@ -110,6 +111,48 @@ object HoodieSparkSqlWriter { */ val SPARK_STREAMING_BATCH_ID = "hoodie.internal.spark.streaming.batch.id" + def write(sqlContext: SQLContext, + mode: SaveMode, + optParams: Map[String, String], + sourceDf: DataFrame, + streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): + (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { + new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams, sourceDf, streamingWritesParamsOpt, hoodieWriteClient) + } + + def bootstrap(sqlContext: SQLContext, + mode: SaveMode, + optParams: Map[String, String], + df: DataFrame, + hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, + streamingWritesParamsOpt: Option[StreamingWriteParams] = Option.empty, + hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty): Boolean = { + new HoodieSparkSqlWriterInternal().bootstrap(sqlContext, mode, optParams, df, hoodieTableConfigOpt, streamingWritesParamsOpt, hoodieWriteClient) + } + + /** + * Deduces writer's schema based on + * <ul> + * <li>Source's schema</li> + * <li>Target table's schema (including Hudi's [[InternalSchema]] representation)</li> + * </ul> + */ + def deduceWriterSchema(sourceSchema: Schema, + latestTableSchemaOpt: Option[Schema], + internalSchemaOpt: Option[InternalSchema], + opts: Map[String, String]): Schema = { + new HoodieSparkSqlWriterInternal().deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, opts) + } + + def cleanup(): Unit = { + Metrics.shutdownAllMetrics() + } + +} + +class HoodieSparkSqlWriterInternal { + private val log = LoggerFactory.getLogger(getClass) private var tableExists: Boolean = false private var asyncCompactionTriggerFnDefined: Boolean = false @@ -949,10 +992,6 @@ object HoodieSparkSqlWriter { } } - def cleanup() : Unit = { - Metrics.shutdownAllMetrics() - } - private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String, operation: WriteOperationType, fs: FileSystem): Unit = { if (mode == SaveMode.Append && tableExists) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala index 52e1ae812c9..15b4cda243d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala @@ -89,7 +89,7 @@ class TestSparkDataSourceDAGExecution extends HoodieSparkClientTestBase with Sca @CsvSource(Array( "upsert,org.apache.hudi.client.SparkRDDWriteClient.commit", "insert,org.apache.hudi.client.SparkRDDWriteClient.commit", - "bulk_insert,org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow")) + "bulk_insert,org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow")) def testWriteOperationDoesNotTriggerRepeatedDAG(operation: String, event: String): Unit = { // register stage event listeners val stageListener = new StageListener(event)