GitHub user bjkonglu opened a pull request:

    https://github.com/apache/spark/pull/21718

    [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSession configurati…

    # Background
    When I use structured streaming to construct my application, there is 
something odd! The application always set option [spark.sql.shuffle.partitions] 
to default value [200]. Even though, I set [spark.sql.shuffle.partitions] to 
other value by SparkConf or --conf spark.sql.shuffle.partitions=100,  but it 
doesn't work. The option value is default value as before.
    # Analyse
    I review the relevant code. The relevant code is in 
[org.apache.spark.sql.execution.streaming.OffsetSeqMetadata].
    
    ```scala
    /** Set the SparkSession configuration with the values in the metadata */
      def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: 
RuntimeConfig): Unit = {
        OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
    
          metadata.conf.get(confKey) match {
    
            case Some(valueInMetadata) =>
              // Config value exists in the metadata, update the session config 
with this value
              val optionalValueInSession = sessionConf.getOption(confKey)
              if (optionalValueInSession.isDefined && 
optionalValueInSession.get != valueInMetadata) {
                logWarning(s"Updating the value of conf '$confKey' in current 
session from " +
                  s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
              }
              sessionConf.set(confKey, valueInMetadata)
    
            case None =>
              // For backward compatibility, if a config was not recorded in 
the offset log,
              // then log it, and let the existing conf value in SparkSession 
prevail.
              logWarning (s"Conf '$confKey' was not found in the offset log, 
using existing value")
          }
        }
      }
    ```
    In this code, we can find it always set some option in metadata value. But 
as user, we want to those option can set by user. So I changed this code.
    
    ```scala
    /** Set the SparkSession configuration with the values in the metadata */
      def setSessionConf(metadata: OffsetSeqMetadata, sessionConf: 
RuntimeConfig): Unit = {
        OffsetSeqMetadata.relevantSQLConfs.map(_.key).foreach { confKey =>
    
          metadata.conf.get(confKey) match {
    
            case Some(valueInMetadata) =>
              // Config value exists in the metadata, update the session config 
with this value
              val optionalValueInSession = sessionConf.getOption(confKey)
    
              if (optionalValueInSession.isDefined && 
optionalValueInSession.get != valueInMetadata) {
                sessionConf.set(confKey, optionalValueInSession.get)
              } else {
                logWarning(s"Updating the value of conf '$confKey' in current 
session from " +
                  s"'${optionalValueInSession.get}' to '$valueInMetadata'.")
                sessionConf.set(confKey, valueInMetadata)
              }
    
            case None =>
              // For backward compatibility, if a config was not recorded in 
the offset log,
              // then log it, and let the existing conf value in SparkSession 
prevail.
              logWarning (s"Conf '$confKey' was not found in the offset log, 
using existing value")
          }
        }
      }
    ```
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bjkonglu/spark OffsetSeq

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21718.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21718
    
----
commit 941f123f0a6b9c013394a1004d79740debcc17fe
Author: bjkonglu <konglu@...>
Date:   2018-07-05T08:58:04Z

    [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSession configuration 
with user set if there are values in metadata and values in user sets.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to