Repository: spark
Updated Branches:
  refs/heads/master 6c5cb8585 -> ac0174e55


[SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in 
module configurable

## What changes were proposed in this pull request?

In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider 
com.databricks.spark.avro is mapped to the new package 
org.apache.spark.sql.avro .

As per the discussion in the 
[Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we 
should make the mapping configurable.

This PR also improve the error message when data source of Avro/Kafka is not 
found.

## How was this patch tested?

Unit test

Closes #22133 from gengliangwang/configurable_avro_mapping.

Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac0174e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac0174e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac0174e5

Branch: refs/heads/master
Commit: ac0174e55af2e935d41545721e9f430c942b3a0c
Parents: 6c5cb85
Author: Gengliang Wang <gengliang.w...@databricks.com>
Authored: Tue Aug 21 15:26:24 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Tue Aug 21 15:26:24 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/avro/AvroSuite.scala     | 11 ++++++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala   | 10 ++++++++++
 .../sql/execution/datasources/DataSource.scala    | 16 ++++++++++++++--
 .../sql/sources/ResolvedDataSourceSuite.scala     | 18 ++++++++++++++++++
 4 files changed, 52 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index c4f4d8e..72bef9e 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
   }
 
   test("resolve avro data source") {
-    Seq("avro", "com.databricks.spark.avro").foreach { provider =>
+    val databricksAvro = "com.databricks.spark.avro"
+    // By default the backward compatibility for com.databricks.spark.avro is 
enabled.
+    Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", 
databricksAvro).foreach { provider =>
       assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
         classOf[org.apache.spark.sql.avro.AvroFileFormat])
     }
+
+    withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> 
"false") {
+      val message = intercept[AnalysisException] {
+        DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf)
+      }.getMessage
+      assert(message.contains(s"Failed to find data source: $databricksAvro"))
+    }
   }
 
   test("reading from multiple paths") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b44bfe7..5913c94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1469,6 +1469,13 @@ object SQLConf {
     .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
     .createWithDefault(Deflater.DEFAULT_COMPRESSION)
 
+  val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED =
+    buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled")
+      .doc("If it is set to true, the data source provider 
com.databricks.spark.avro is mapped " +
+        "to the built-in but external Avro data source module for backward 
compatibility.")
+      .booleanConf
+      .createWithDefault(true)
+
   val LEGACY_SETOPS_PRECEDENCE_ENABLED =
     buildConf("spark.sql.legacy.setopsPrecedence.enabled")
       .internal()
@@ -1881,6 +1888,9 @@ class SQLConf extends Serializable with Logging {
 
   def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
 
+  def replaceDatabricksSparkAvroEnabled: Boolean =
+    getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)
+
   def setOpsPrecedenceEnforced: Boolean = 
getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)
 
   def parallelFileListingInStatsComputation: Boolean =

http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b1a10fd..1dcf9f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -571,7 +571,6 @@ object DataSource extends Logging {
     val nativeOrc = classOf[OrcFileFormat].getCanonicalName
     val socket = classOf[TextSocketSourceProvider].getCanonicalName
     val rate = classOf[RateStreamProvider].getCanonicalName
-    val avro = "org.apache.spark.sql.avro.AvroFileFormat"
 
     Map(
       "org.apache.spark.sql.jdbc" -> jdbc,
@@ -593,7 +592,6 @@ object DataSource extends Logging {
       "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
       "org.apache.spark.ml.source.libsvm" -> libsvm,
       "com.databricks.spark.csv" -> csv,
-      "com.databricks.spark.avro" -> avro,
       "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> 
socket,
       "org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
     )
@@ -616,6 +614,8 @@ object DataSource extends Logging {
       case name if name.equalsIgnoreCase("orc") &&
           conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
         "org.apache.spark.sql.hive.orc.OrcFileFormat"
+      case "com.databricks.spark.avro" if 
conf.replaceDatabricksSparkAvroEnabled =>
+        "org.apache.spark.sql.avro.AvroFileFormat"
       case name => name
     }
     val provider2 = s"$provider1.DefaultSource"
@@ -637,6 +637,18 @@ object DataSource extends Logging {
                     "Hive built-in ORC data source must be used with Hive 
support enabled. " +
                     "Please use the native ORC data source by setting 
'spark.sql.orc.impl' to " +
                     "'native'")
+                } else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
+                  provider1 == "com.databricks.spark.avro" ||
+                  provider1 == "org.apache.spark.sql.avro") {
+                  throw new AnalysisException(
+                    s"Failed to find data source: $provider1. Avro is built-in 
but external data " +
+                    "source module since Spark 2.4. Please deploy the 
application as per " +
+                    "the deployment section of \"Apache Avro Data Source 
Guide\".")
+                } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
+                  throw new AnalysisException(
+                    s"Failed to find data source: $provider1. Please deploy 
the application as " +
+                    "per the deployment section of " +
+                    "\"Structured Streaming + Kafka Integration Guide\".")
                 } else {
                   throw new ClassNotFoundException(
                     s"Failed to find data source: $provider1. Please find 
packages at " +

http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 95460fa..0aa67bf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -76,6 +76,24 @@ class ResolvedDataSourceSuite extends SparkFunSuite with 
SharedSQLContext {
         classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
   }
 
+  test("avro: show deploy guide for loading the external avro module") {
+    Seq("avro", "org.apache.spark.sql.avro").foreach { provider =>
+      val message = intercept[AnalysisException] {
+        getProvidingClass(provider)
+      }.getMessage
+      assert(message.contains(s"Failed to find data source: $provider"))
+      assert(message.contains("Please deploy the application as per the 
deployment section of"))
+    }
+  }
+
+  test("kafka: show deploy guide for loading the external kafka module") {
+    val message = intercept[AnalysisException] {
+      getProvidingClass("kafka")
+    }.getMessage
+    assert(message.contains("Failed to find data source: kafka"))
+    assert(message.contains("Please deploy the application as per the 
deployment section of"))
+  }
+
   test("error message for unknown data sources") {
     val error = intercept[ClassNotFoundException] {
       getProvidingClass("asfdwefasdfasdf")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to