bersprockets commented on code in PR #48325:
URL: https://github.com/apache/spark/pull/48325#discussion_r1854207665
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala:
##########
@@ -43,16 +43,85 @@ class SQLExecutionRDD(
override def getPartitions: Array[Partition] =
firstParent[InternalRow].partitions
+ /**
+ * This is a wrapper for the iterator obtained in compute. It ensures that
the
+ * SQLConf is set on the executor side at the time the iterator is actually
being
+ * used.
+ *
+ * There could be multiple LocalIterators running in a task (e.g., zip of
two SQLExecutionRDDs).
+ * Therefore, when this LocalIterator attempts to set the conf, it may
already have been
+ * set by another LocalIterator. We keep a thread local flag to indicate
whether the conf
+ * has been set. If the flag is already set, this LocalIterator will not
attempt to set
+ * the conf. Otherwise, it will set the conf at the first call to either
hasNext() or next().
+ *
+ * The LocalIterator that actually set the conf will be responsible for
putting back the old
+ * conf when the task completes.
+ *
+ * @param it The iterator to wrap
+ */
+ class LocalIterator(it: Iterator[InternalRow]) extends Iterator[InternalRow]
{
+ private var initialized = false // iterator-local cache of
SQLExecutionRDD.confInit
+ private var oldConfOpt: Option[SQLConf] = None // old conf
+
+ def initialize(): Unit = {
+ if (!SQLExecutionRDD.confInit.get) {
+ // set the desired executor side conf and save the old conf
+ oldConfOpt = SQLConf.set(Some(sqlConfExecutorSide))
+ // indicate locally that we're initialized
+ initialized = true
+ // indicate to other LocalIterators in the task that we are initialized
+ SQLExecutionRDD.confInit.set(true)
+
+ // we need to know when the task completes so we can put back the old
conf
+ TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+ if (initialized) {
+ if (SQLExecutionRDD.confInit.get) {
+ SQLConf.set(oldConfOpt)
+ initialized = false
+ SQLExecutionRDD.confInit.set(false)
+ } else {
+ initialized = false
+ }
+ }
+ }
+ } else {
+ // some other LocalIterator in this task has initialized the conf.
+ // Simply flag locally that we're initialized
+ initialized = true
+ }
+ }
+
+ override def next(): InternalRow = {
+ if (!initialized) {
+ initialize()
+ }
+ it.next()
+ }
+
+ override def hasNext: Boolean = {
+ if (!initialized) {
+ initialize()
+ }
+ it.hasNext
+ }
+ }
+
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
// If we are in the context of a tracked SQL operation,
`SQLExecution.EXECUTION_ID_KEY` is set
// and we have nothing to do here. Otherwise, we use the `SQLConf`
captured at the creation of
// this RDD.
if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
SQLConf.withExistingConf(sqlConfExecutorSide) {
Review Comment:
>It's not the case for the query you hit?
There are several queries like that.
Some executor-side code consults `SQLConf` only after
`SQLExecutionRDD#compute` completes (e.g., `JacksonParser`, which looks at the
current setting of `jsonEnablePartialResults`).
I added some debug print statements to demo this case:
```
scala> val df = Seq("{'data': {'a': [19], 'b': 123456}}").toDF("str").cache()
val df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [str:
string]
scala> val query = df.selectExpr("from_json(str, 'data struct<a: int, b:
int>')")
val query: org.apache.spark.sql.DataFrame = [from_json(str): struct<data:
struct<a: int, b: int>>]
scala> query.rdd.collect()(0)
>>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0) Leaving
SQLExecutionRDD#compute
>>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0)
Constructing JacksonParser
>>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0)
JacksonParser consulting SQLConf!
val res0: org.apache.spark.sql.Row = [[[null,123456]]]
scala>
```
Note that the message "Leaving SQLExecutionRDD#compute" precedes
"JacksonParser consulting SQLConf!". This consultation will only ever see
default settings.
Another example is the `TimestampFormatter` object (which looks at the
current setting of `legacyTimeParserPolicy`):
```
scala> val test = {
spark
.read
.option("header", "false")
.schema("id int, ts timestamp")
.csv("csvfile")
}
>>> main getFormatter consulting SQLConf
>>> main getFormatter consulting SQLConf
val test: org.apache.spark.sql.DataFrame = [id: int, ts: timestamp]
scala> test.rdd.collect()(0)
>>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0) Leaving
SQLExecutionRDD#compute
>>> Executor task launch worker for task 0.0 in stage 0.0 (TID 0)
getFormatter consulting SQLConf
val res0: org.apache.spark.sql.Row = [1,2021-11-22 11:27:01.0]
scala>
```
Again, note that the message "Leaving SQLExecutionRDD#compute" precedes
"getFormatter consulting SQLConf".
Edit: Both these above cases result in correctness issues. The correctness
issue described in the PR description is due to the first case. Here's a
correctness issue due to the second case:
```
val data = Seq("1,2021-11-22 11:27:01").toDF()
data.write.mode("overwrite").text("csvfile")
sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
val test = {
spark
.read
.option("header", "false")
.schema("id int, ts timestamp")
.csv("csvfile")
}
test.collect()(0)
test.rdd.collect()(0)
```
The first collect returns
```
[1,2021-11-22 11:27:01.0]
```
The second returns
```
[1,null]
```
The timestamp is null due a mismatch between the value of
`spark.sql.legacy.timeParserPolicy` between the driver and the executor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]