Repository: spark Updated Branches: refs/heads/master 6c5cb8585 -> ac0174e55
[SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable ## What changes were proposed in this pull request? In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider com.databricks.spark.avro is mapped to the new package org.apache.spark.sql.avro . As per the discussion in the [Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we should make the mapping configurable. This PR also improve the error message when data source of Avro/Kafka is not found. ## How was this patch tested? Unit test Closes #22133 from gengliangwang/configurable_avro_mapping. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac0174e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac0174e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac0174e5 Branch: refs/heads/master Commit: ac0174e55af2e935d41545721e9f430c942b3a0c Parents: 6c5cb85 Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Tue Aug 21 15:26:24 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Tue Aug 21 15:26:24 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/avro/AvroSuite.scala | 11 ++++++++++- .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ .../sql/execution/datasources/DataSource.scala | 16 ++++++++++++++-- .../sql/sources/ResolvedDataSourceSuite.scala | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index c4f4d8e..72bef9e 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("resolve avro data source") { - Seq("avro", "com.databricks.spark.avro").foreach { provider => + val databricksAvro = "com.databricks.spark.avro" + // By default the backward compatibility for com.databricks.spark.avro is enabled. + Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider => assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) === classOf[org.apache.spark.sql.avro.AvroFileFormat]) } + + withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> "false") { + val message = intercept[AnalysisException] { + DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf) + }.getMessage + assert(message.contains(s"Failed to find data source: $databricksAvro")) + } } test("reading from multiple paths") { http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b44bfe7..5913c94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1469,6 +1469,13 @@ object SQLConf { .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED = + buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled") + .doc("If it is set to true, the data source provider com.databricks.spark.avro is mapped " + + "to the built-in but external Avro data source module for backward compatibility.") + .booleanConf + .createWithDefault(true) + val LEGACY_SETOPS_PRECEDENCE_ENABLED = buildConf("spark.sql.legacy.setopsPrecedence.enabled") .internal() @@ -1881,6 +1888,9 @@ class SQLConf extends Serializable with Logging { def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL) + def replaceDatabricksSparkAvroEnabled: Boolean = + getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED) + def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED) def parallelFileListingInStatsComputation: Boolean = http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b1a10fd..1dcf9f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -571,7 +571,6 @@ object DataSource extends Logging { val nativeOrc = classOf[OrcFileFormat].getCanonicalName val socket = classOf[TextSocketSourceProvider].getCanonicalName val rate = classOf[RateStreamProvider].getCanonicalName - val avro = "org.apache.spark.sql.avro.AvroFileFormat" Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -593,7 +592,6 @@ object DataSource extends Logging { "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, "org.apache.spark.ml.source.libsvm" -> libsvm, "com.databricks.spark.csv" -> csv, - "com.databricks.spark.avro" -> avro, "org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket, "org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate ) @@ -616,6 +614,8 @@ object DataSource extends Logging { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" + case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled => + "org.apache.spark.sql.avro.AvroFileFormat" case name => name } val provider2 = s"$provider1.DefaultSource" @@ -637,6 +637,18 @@ object DataSource extends Logging { "Hive built-in ORC data source must be used with Hive support enabled. " + "Please use the native ORC data source by setting 'spark.sql.orc.impl' to " + "'native'") + } else if (provider1.toLowerCase(Locale.ROOT) == "avro" || + provider1 == "com.databricks.spark.avro" || + provider1 == "org.apache.spark.sql.avro") { + throw new AnalysisException( + s"Failed to find data source: $provider1. Avro is built-in but external data " + + "source module since Spark 2.4. Please deploy the application as per " + + "the deployment section of \"Apache Avro Data Source Guide\".") + } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") { + throw new AnalysisException( + s"Failed to find data source: $provider1. Please deploy the application as " + + "per the deployment section of " + + "\"Structured Streaming + Kafka Integration Guide\".") } else { throw new ClassNotFoundException( s"Failed to find data source: $provider1. Please find packages at " + http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 95460fa..0aa67bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -76,6 +76,24 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext { classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]) } + test("avro: show deploy guide for loading the external avro module") { + Seq("avro", "org.apache.spark.sql.avro").foreach { provider => + val message = intercept[AnalysisException] { + getProvidingClass(provider) + }.getMessage + assert(message.contains(s"Failed to find data source: $provider")) + assert(message.contains("Please deploy the application as per the deployment section of")) + } + } + + test("kafka: show deploy guide for loading the external kafka module") { + val message = intercept[AnalysisException] { + getProvidingClass("kafka") + }.getMessage + assert(message.contains("Failed to find data source: kafka")) + assert(message.contains("Please deploy the application as per the deployment section of")) + } + test("error message for unknown data sources") { val error = intercept[ClassNotFoundException] { getProvidingClass("asfdwefasdfasdf") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org