GitHub user bjkonglu opened a pull request:
https://github.com/apache/spark/pull/21718
[SPARK-24744][STRUCTRURED STREAMING] Set the SparkSession configuratiâ¦
# Background
When I use structured streaming to construct my application, there is
something odd! The application always set option [spark.sql.shuffle.partitions]
to default value [200]. Even though, I set [spark.sql.shuffle.partitions] to
other value by SparkConf or --conf spark.sql.shuffle.partitions=100, but it
doesn't work. The option value is default value as before.
# Analyse
I review the relevant code. The relevant code is in
[org.apache.spark.sql.execution.streaming.OffsetSeqMetadata].
```scala
/** Set the SparkSession configuration with the values in the metadata */
def setSessionConf(metadata: OffsetSeqMetadata, sessionConf:
RuntimeConfig): Unit = {
OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
metadata.conf.get(confKey) match {
case Some(valueInMetadata) =>
// Config value exists in the metadata, update the session config
with this value
val optionalValueInSession = sessionConf.getOption(confKey)
if (optionalValueInSession.isDefined &&
optionalValueInSession.get != valueInMetadata) {
logWarning(s"Updating the value of conf '$confKey' in current
session from " +
s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
}
sessionConf.set(confKey, valueInMetadata)
case None =>
// For backward compatibility, if a config was not recorded in
the offset log,
// then log it, and let the existing conf value in SparkSession
prevail.
logWarning (s"Conf '$confKey' was not found in the offset log,
using existing value")
}
}
}
```
In this code, we can find it always set some option in metadata value. But
as user, we want to those option can set by user. So I changed this code.
```scala
/** Set the SparkSession configuration with the values in the metadata */
def setSessionConf(metadata: OffsetSeqMetadata, sessionConf:
RuntimeConfig): Unit = {
OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
metadata.conf.get(confKey) match {
case Some(valueInMetadata) =>
// Config value exists in the metadata, update the session config
with this value
val optionalValueInSession = sessionConf.getOption(confKey)
if (optionalValueInSession.isDefined &&
optionalValueInSession.get != valueInMetadata) {
sessionConf.set(confKey, optionalValueInSession.get)
} else {
logWarning(s"Updating the value of conf '$confKey' in current
session from " +
s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
sessionConf.set(confKey, valueInMetadata)
}
case None =>
// For backward compatibility, if a config was not recorded in
the offset log,
// then log it, and let the existing conf value in SparkSession
prevail.
logWarning (s"Conf '$confKey' was not found in the offset log,
using existing value")
}
}
}
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/bjkonglu/spark OffsetSeq
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21718.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21718
----
commit 941f123f0a6b9c013394a1004d79740debcc17fe
Author: bjkonglu <konglu@...>
Date: 2018-07-05T08:58:04Z
[SPARK-24744][STRUCTRURED STREAMING] Set the SparkSession configuration
with user set if there are values in metadata and values in user sets.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]