cloud-fan commented on code in PR #48325:
URL: https://github.com/apache/spark/pull/48325#discussion_r1886092659


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala:
##########
@@ -43,16 +43,85 @@ class SQLExecutionRDD(
 
   override def getPartitions: Array[Partition] = 
firstParent[InternalRow].partitions
 
+  /**
+   * This is a wrapper for the iterator obtained in compute. It ensures that 
the
+   * SQLConf is set on the executor side at the time the iterator is actually 
being
+   * used.
+   *
+   * There could be multiple LocalIterators running in a task (e.g., zip of 
two SQLExecutionRDDs).
+   * Therefore, when this LocalIterator attempts to set the conf, it may 
already have been
+   * set by another LocalIterator. We keep a thread local flag to indicate 
whether the conf
+   * has been set. If the flag is already set, this LocalIterator will not 
attempt to set
+   * the conf. Otherwise, it will set the conf at the first call to either 
hasNext() or next().
+   *
+   * The LocalIterator that actually set the conf will be responsible for 
putting back the old
+   * conf when the task completes.
+   *
+   * @param it The iterator to wrap
+   */
+  class LocalIterator(it: Iterator[InternalRow]) extends Iterator[InternalRow] 
{
+    private var initialized = false // iterator-local cache of 
SQLExecutionRDD.confInit
+    private var oldConfOpt: Option[SQLConf] = None // old conf
+
+    def initialize(): Unit = {
+      if (!SQLExecutionRDD.confInit.get) {
+        // set the desired executor side conf and save the old conf
+        oldConfOpt = SQLConf.set(Some(sqlConfExecutorSide))
+        // indicate locally that we're initialized
+        initialized = true
+        // indicate to other LocalIterators in the task that we are initialized
+        SQLExecutionRDD.confInit.set(true)
+
+        // we need to know when the task completes so we can put back the old 
conf
+        TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+          if (initialized) {
+            if (SQLExecutionRDD.confInit.get) {
+              SQLConf.set(oldConfOpt)
+              initialized = false
+              SQLExecutionRDD.confInit.set(false)
+            } else {
+              initialized = false
+            }
+          }
+        }
+      } else {
+        // some other LocalIterator in this task has initialized the conf.
+        // Simply flag locally that we're initialized
+        initialized = true
+      }
+    }
+
+    override def next(): InternalRow = {
+      if (!initialized) {
+        initialize()
+      }
+      it.next()
+    }
+
+    override def hasNext: Boolean = {
+      if (!initialized) {
+        initialize()
+      }
+      it.hasNext
+    }
+  }
+
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
     // If we are in the context of a tracked SQL operation, 
`SQLExecution.EXECUTION_ID_KEY` is set
     // and we have nothing to do here. Otherwise, we use the `SQLConf` 
captured at the creation of
     // this RDD.
     if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
       SQLConf.withExistingConf(sqlConfExecutorSide) {

Review Comment:
   I think it's better to centralize the conf accesses, as setting up the 
SQLConf thread-local and wrapping the code that needs to access SQLConf 
thread-local is fragile.
   
   I took a look at one example: `CSVHeaderChecker`. `caseSensitive` should be 
passed in as a parameter, to force the caller side to think about how to access 
the SQLConf.
   
   If we are not able to fix these cases, we can still go back and merge this 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to