This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b245aa7098 Fix race condition in HoodieSparkSqlWriter (#9749)
2b245aa7098 is described below

commit 2b245aa709852f8023a703bd4d574b0491f2c96b
Author: StreamingFlames <18889897...@163.com>
AuthorDate: Sun Oct 22 05:24:16 2023 -0500

    Fix race condition in HoodieSparkSqlWriter (#9749)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 47 ++++++++++++++++++++--
 .../TestSparkDataSourceDAGExecution.scala          |  2 +-
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 581cf2bdc69..49ee4c4b670 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -27,6 +27,7 @@ import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTable
 import 
org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
+import org.apache.hudi.HoodieSparkSqlWriter.{CANONICALIZE_NULLABLE, 
SQL_MERGE_INTO_WRITES, StreamingWriteParams}
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.AvroSchemaUtils.{canProject, 
isCompatibleProjectionOf, isSchemaCompatible, resolveNullableSchema}
 import org.apache.hudi.avro.HoodieAvroUtils
@@ -110,6 +111,48 @@ object HoodieSparkSqlWriter {
    */
   val SPARK_STREAMING_BATCH_ID = "hoodie.internal.spark.streaming.batch.id"
 
+  def write(sqlContext: SQLContext,
+            mode: SaveMode,
+            optParams: Map[String, String],
+            sourceDf: DataFrame,
+            streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
+            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty):
+  (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
+    new HoodieSparkSqlWriterInternal().write(sqlContext, mode, optParams, 
sourceDf, streamingWritesParamsOpt, hoodieWriteClient)
+  }
+
+  def bootstrap(sqlContext: SQLContext,
+                mode: SaveMode,
+                optParams: Map[String, String],
+                df: DataFrame,
+                hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
+                streamingWritesParamsOpt: Option[StreamingWriteParams] = 
Option.empty,
+                hoodieWriteClient: Option[SparkRDDWriteClient[_]] = 
Option.empty): Boolean = {
+    new HoodieSparkSqlWriterInternal().bootstrap(sqlContext, mode, optParams, 
df, hoodieTableConfigOpt, streamingWritesParamsOpt, hoodieWriteClient)
+  }
+
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   * <li>Source's schema</li>
+   * <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    new HoodieSparkSqlWriterInternal().deduceWriterSchema(sourceSchema, 
latestTableSchemaOpt, internalSchemaOpt, opts)
+  }
+
+  def cleanup(): Unit = {
+    Metrics.shutdownAllMetrics()
+  }
+
+}
+
+class HoodieSparkSqlWriterInternal {
+
   private val log = LoggerFactory.getLogger(getClass)
   private var tableExists: Boolean = false
   private var asyncCompactionTriggerFnDefined: Boolean = false
@@ -949,10 +992,6 @@ object HoodieSparkSqlWriter {
     }
   }
 
-  def cleanup() : Unit = {
-    Metrics.shutdownAllMetrics()
-  }
-
   private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: 
Path, tableConfig: HoodieTableConfig, tableName: String,
                               operation: WriteOperationType, fs: FileSystem): 
Unit = {
     if (mode == SaveMode.Append && tableExists) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
index 52e1ae812c9..15b4cda243d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
@@ -89,7 +89,7 @@ class TestSparkDataSourceDAGExecution extends 
HoodieSparkClientTestBase with Sca
   @CsvSource(Array(
     "upsert,org.apache.hudi.client.SparkRDDWriteClient.commit",
     "insert,org.apache.hudi.client.SparkRDDWriteClient.commit",
-    "bulk_insert,org.apache.hudi.HoodieSparkSqlWriter$.bulkInsertAsRow"))
+    
"bulk_insert,org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow"))
   def testWriteOperationDoesNotTriggerRepeatedDAG(operation: String, event: 
String): Unit = {
     // register stage event listeners
     val stageListener = new StageListener(event)

Reply via email to