bersprockets commented on code in PR #48325:
URL: https://github.com/apache/spark/pull/48325#discussion_r1854207665


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala:
##########
@@ -43,16 +43,85 @@ class SQLExecutionRDD(
 
   override def getPartitions: Array[Partition] = 
firstParent[InternalRow].partitions
 
+  /**
+   * This is a wrapper for the iterator obtained in compute. It ensures that 
the
+   * SQLConf is set on the executor side at the time the iterator is actually 
being
+   * used.
+   *
+   * There could be multiple LocalIterators running in a task (e.g., zip of 
two SQLExecutionRDDs).
+   * Therefore, when this LocalIterator attempts to set the conf, it may 
already have been
+   * set by another LocalIterator. We keep a thread local flag to indicate 
whether the conf
+   * has been set. If the flag is already set, this LocalIterator will not 
attempt to set
+   * the conf. Otherwise, it will set the conf at the first call to either 
hasNext() or next().
+   *
+   * The LocalIterator that actually set the conf will be responsible for 
putting back the old
+   * conf when the task completes.
+   *
+   * @param it The iterator to wrap
+   */
+  class LocalIterator(it: Iterator[InternalRow]) extends Iterator[InternalRow] 
{
+    private var initialized = false // iterator-local cache of 
SQLExecutionRDD.confInit
+    private var oldConfOpt: Option[SQLConf] = None // old conf
+
+    def initialize(): Unit = {
+      if (!SQLExecutionRDD.confInit.get) {
+        // set the desired executor side conf and save the old conf
+        oldConfOpt = SQLConf.set(Some(sqlConfExecutorSide))
+        // indicate locally that we're initialized
+        initialized = true
+        // indicate to other LocalIterators in the task that we are initialized
+        SQLExecutionRDD.confInit.set(true)
+
+        // we need to know when the task completes so we can put back the old 
conf
+        TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+          if (initialized) {
+            if (SQLExecutionRDD.confInit.get) {
+              SQLConf.set(oldConfOpt)
+              initialized = false
+              SQLExecutionRDD.confInit.set(false)
+            } else {
+              initialized = false
+            }
+          }
+        }
+      } else {
+        // some other LocalIterator in this task has initialized the conf.
+        // Simply flag locally that we're initialized
+        initialized = true
+      }
+    }
+
+    override def next(): InternalRow = {
+      if (!initialized) {
+        initialize()
+      }
+      it.next()
+    }
+
+    override def hasNext: Boolean = {
+      if (!initialized) {
+        initialize()
+      }
+      it.hasNext
+    }
+  }
+
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
     // If we are in the context of a tracked SQL operation, 
`SQLExecution.EXECUTION_ID_KEY` is set
     // and we have nothing to do here. Otherwise, we use the `SQLConf` 
captured at the creation of
     // this RDD.
     if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
       SQLConf.withExistingConf(sqlConfExecutorSide) {

Review Comment:
   >It's not the case for the query you hit?
   
   There are several queries like that.
   
   Some executor-side code consults `SQLConf` only after 
`SQLExecutionRDD#compute` completes (e.g., `JacksonParser`, which looks at the 
current setting of `jsonEnablePartialResults`).
   
   I added some debug print statements to demo this case:
   ```
   scala> val df = Seq("{'data': {'a': [19], 'b': 123456}}").toDF("str").cache()
   val df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [str: 
string]
   
   scala> val query = df.selectExpr("from_json(str, 'data struct<a: int, b: 
int>')")
   val query: org.apache.spark.sql.DataFrame = [from_json(str): struct<data: 
struct<a: int, b: int>>]
   
   scala> query.rdd.collect()(0)
   >>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0) Leaving 
SQLExecutionRDD#compute
   >>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0) 
Constructing JacksonParser
   >>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0) 
JacksonParser consulting SQLConf!
   val res0: org.apache.spark.sql.Row = [[[null,123456]]]
   
   scala>
   ```
   Note that the message "Leaving SQLExecutionRDD#compute" precedes 
"JacksonParser consulting SQLConf!". This consultation will only ever see 
default settings.
   
   Another example is the `TimestampFormatter` object (which looks at the 
current setting of `legacyTimeParserPolicy`):
   ```
   scala> val test = {
     spark
     .read
     .option("header", "false")
     .schema("id int, ts timestamp")
     .csv("csvfile")
   }
   >>> main getFormatter consulting SQLConf
   >>> main getFormatter consulting SQLConf
   val test: org.apache.spark.sql.DataFrame = [id: int, ts: timestamp]
   
   scala> test.rdd.collect()(0)
   >>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0) Leaving 
SQLExecutionRDD#compute
   >>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0) 
getFormatter consulting SQLConf
   val res0: org.apache.spark.sql.Row = [1,2021-11-22 11:27:01.0]
   
   scala> 
   ```
   Again, note that the message "Leaving SQLExecutionRDD#compute" precedes 
"getFormatter consulting SQLConf".
   
   Edit: Both these above cases result in correctness issues. The correctness 
issue described in the PR description is due to the first case. Here's a 
correctness issue due to the second case:
   ```
   val data = Seq("1,2021-11-22 11:27:01").toDF()
   data.write.mode("overwrite").text("csvfile")
   
   sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
   
   val test = {
     spark
     .read
     .option("header", "false")
     .schema("id int, ts timestamp")
     .csv("csvfile")
   }
   
   test.collect()(0)
   test.rdd.collect()(0)
   ```
   The first collect returns
   ```
   [1,2021-11-22 11:27:01.0]
   ```
   The second returns
   ```
   [1,null]
   ```
   The timestamp is null due a mismatch between the value of 
`spark.sql.legacy.timeParserPolicy` between the driver and the executor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to