[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-12-06 Thread stczwd
Github user stczwd commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r239670323
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -631,6 +631,33 @@ object SQLConf {
 .intConf
 .createWithDefault(200)
 
+  val SQLSTREAM_WATERMARK_ENABLE = 
buildConf("spark.sqlstreaming.watermark.enable")
+.doc("Whether use watermark in sqlstreaming.")
+.booleanConf
+.createWithDefault(false)
+
+  val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
+.doc("The output mode used in sqlstreaming")
+.stringConf
+.createWithDefault("append")
+
+  val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger")
--- End diff --

> insert into kafka_sql_out select stream t1.value from (select cast(value 
as string), timestamp as time1 from kafka_sql_in1) as t1 inner join (select 
cast(value as string), timestamp as time2 from kafka_sql_in2) as t2 on time1 >= 
time2 and time1 <= time2 + interval 10 seconds where t1.value == t2.value

No, SQLStreaming support stream join stream. The watermark config is put in 
the table properties.
As for trigger interval, different sources in stream join stream scene 
needs different trigger config?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-12-06 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r239500890
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -631,6 +631,33 @@ object SQLConf {
 .intConf
 .createWithDefault(200)
 
+  val SQLSTREAM_WATERMARK_ENABLE = 
buildConf("spark.sqlstreaming.watermark.enable")
+.doc("Whether use watermark in sqlstreaming.")
+.booleanConf
+.createWithDefault(false)
+
+  val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
+.doc("The output mode used in sqlstreaming")
+.stringConf
+.createWithDefault("append")
+
+  val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger")
--- End diff --

so here stream-stream join is not supported right?  to elaborate can i 
create two stream source tables and then join both and write to sink?
because if i want to create two streams for 2 different topics, i may need 
to provide different configurations for watermark or window or rigger interval. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-12-05 Thread stczwd
Github user stczwd commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r239113033
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -631,6 +631,33 @@ object SQLConf {
 .intConf
 .createWithDefault(200)
 
+  val SQLSTREAM_WATERMARK_ENABLE = 
buildConf("spark.sqlstreaming.watermark.enable")
+.doc("Whether use watermark in sqlstreaming.")
+.booleanConf
+.createWithDefault(false)
+
+  val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
+.doc("The output mode used in sqlstreaming")
+.stringConf
+.createWithDefault("append")
+
+  val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger")
--- End diff --

I don't think there are any problems with this. SQLStreaming is using 
Command to run streaming query, which is similar to InsertIntoHiveTable. 
herefore, the batch SQL and streaming SQL solution is expected.In addition, 
currently an application can only run one streaming SQL.
Therefore, the batch SQL and streaming SQL solution is expected


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-12-05 Thread stczwd
Github user stczwd commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r239109280
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
+import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.Utils
+
+/**
+ * The basic RunnableCommand for SQLStreaming, using Command.run to start 
a streaming query.
+ *
+ * @param sparkSession
+ * @param extraOptions
+ * @param partitionColumnNames
+ * @param child
+ */
+case class SQLStreamingSink(sparkSession: SparkSession,
+table: CatalogTable,
+child: LogicalPlan)
+  extends RunnableCommand {
+
+  private val sqlConf = sparkSession.sqlContext.conf
+
+  /**
+   * The given column name may not be equal to any of the existing column 
names if we were in
+   * case-insensitive context. Normalize the given column name to the real 
one so that we don't
+   * need to care about case sensitivity afterwards.
+   */
+  private def normalize(df: DataFrame, columnName: String, columnType: 
String): String = {
+val validColumnNames = df.logicalPlan.output.map(_.name)
+validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, 
columnName))
+  .getOrElse(throw new AnalysisException(s"$columnType column 
$columnName not found in " +
+s"existing columns (${validColumnNames.mkString(", ")})"))
+  }
+
+  /**
+   * Parse spark.sqlstreaming.trigger.seconds to Trigger
+   */
+  private def parseTrigger(): Trigger = {
+val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger)
+Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS)
--- End diff --

Yeah, I will change it to milliseconds.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-12-03 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r238336135
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala
 ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
+import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.Utils
+
+/**
+ * The basic RunnableCommand for SQLStreaming, using Command.run to start 
a streaming query.
+ *
+ * @param sparkSession
+ * @param extraOptions
+ * @param partitionColumnNames
+ * @param child
+ */
+case class SQLStreamingSink(sparkSession: SparkSession,
+table: CatalogTable,
+child: LogicalPlan)
+  extends RunnableCommand {
+
+  private val sqlConf = sparkSession.sqlContext.conf
+
+  /**
+   * The given column name may not be equal to any of the existing column 
names if we were in
+   * case-insensitive context. Normalize the given column name to the real 
one so that we don't
+   * need to care about case sensitivity afterwards.
+   */
+  private def normalize(df: DataFrame, columnName: String, columnType: 
String): String = {
+val validColumnNames = df.logicalPlan.output.map(_.name)
+validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, 
columnName))
+  .getOrElse(throw new AnalysisException(s"$columnType column 
$columnName not found in " +
+s"existing columns (${validColumnNames.mkString(", ")})"))
+  }
+
+  /**
+   * Parse spark.sqlstreaming.trigger.seconds to Trigger
+   */
+  private def parseTrigger(): Trigger = {
+val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger)
+Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS)
--- End diff --

do we require micro seconds unit here? milliseconds/seconds will do i 
guess.the lowest latency supported by structured stream is 100 ms.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-12-03 Thread sujith71955
Github user sujith71955 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r238329995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -631,6 +631,33 @@ object SQLConf {
 .intConf
 .createWithDefault(200)
 
+  val SQLSTREAM_WATERMARK_ENABLE = 
buildConf("spark.sqlstreaming.watermark.enable")
+.doc("Whether use watermark in sqlstreaming.")
+.booleanConf
+.createWithDefault(false)
+
+  val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
+.doc("The output mode used in sqlstreaming")
+.stringConf
+.createWithDefault("append")
+
+  val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger")
--- End diff --

we have so many configurations, i think in thrift server scenarios where 
user can open multiple sessions and run streaming query based on different 
query context. each query will be requiring its own context of trigger 
intervals,water marking,windowing. can you elaborate a bit how we address these 
scenarios.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-11-29 Thread stczwd
Github user stczwd commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r237721103
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class StreamTableDDLCommandSuite extends SQLTestUtils with 
TestHiveSingleton {
+  private val catalog = spark.sessionState.catalog
+
+  test("CTAS: create data source stream table") {
+withTempPath { dir =>
+  withTable("t") {
+sql(
+  s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (
+ |PATH = '${dir.toURI}',
+ |location = '${dir.toURI}',
+ |isStreaming = 'true')
+ |AS SELECT 1 AS a, 2 AS b, 3 AS c
+  """.stripMargin)
--- End diff --

In this place, child is a streaming logicalPlan. If not, it will throw 
Exceptions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-11-28 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r237372804
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class StreamTableDDLCommandSuite extends SQLTestUtils with 
TestHiveSingleton {
+  private val catalog = spark.sessionState.catalog
+
+  test("CTAS: create data source stream table") {
+withTempPath { dir =>
+  withTable("t") {
+sql(
+  s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (
+ |PATH = '${dir.toURI}',
+ |location = '${dir.toURI}',
+ |isStreaming = 'true')
+ |AS SELECT 1 AS a, 2 AS b, 3 AS c
+  """.stripMargin)
--- End diff --

At 
https://github.com/apache/spark/pull/22575/files#diff-fa4547f0c6dd7810576cd4262a2dfb46R78

the `child` logicalPlan is not streaming logicalPlan?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread stczwd
Github user stczwd commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r226853809
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -631,6 +631,33 @@ object SQLConf {
 .intConf
 .createWithDefault(200)
 
+  val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
--- End diff --

Then, may you provide a more appropriate name? These configurations should 
be used only in SQLStreaming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread stczwd
Github user stczwd commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r226853804
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class StreamTableDDLCommandSuite extends SQLTestUtils with 
TestHiveSingleton {
+  private val catalog = spark.sessionState.catalog
+
+  test("CTAS: create data source stream table") {
+withTempPath { dir =>
+  withTable("t") {
+sql(
+  s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (
+ |PATH = '${dir.toURI}',
+|location = '${dir.toURI}',
--- End diff --

Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-21 Thread stczwd
Github user stczwd commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r226853724
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -63,7 +63,9 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   providerName: String,
   parameters: Map[String, String]): (String, StructType) = {
 validateStreamOptions(parameters)
-require(schema.isEmpty, "Kafka source has a fixed schema and cannot be 
set with a custom one")
+if(schema.isDefined) {
--- End diff --

KafkaStreamSourceProvider is an expression for SQLStreaming. When creating 
DataSource based on Kafka Streaming Table, schema will be passed, thus 
KafkaSourceProvider should be compatible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r225998489
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class StreamTableDDLCommandSuite extends SQLTestUtils with 
TestHiveSingleton {
+  private val catalog = spark.sessionState.catalog
+
+  test("CTAS: create data source stream table") {
+withTempPath { dir =>
+  withTable("t") {
+sql(
+  s"""CREATE TABLE t USING PARQUET
+ |OPTIONS (
+ |PATH = '${dir.toURI}',
+|location = '${dir.toURI}',
--- End diff --

nit: indent here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r225997731
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -631,6 +631,33 @@ object SQLConf {
 .intConf
 .createWithDefault(200)
 
+  val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode")
--- End diff --

Not sure `spark.sqlstreaming` is the right way of config name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark

2018-10-17 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22575#discussion_r225992780
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -63,7 +63,9 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   providerName: String,
   parameters: Map[String, String]): (String, StructType) = {
 validateStreamOptions(parameters)
-require(schema.isEmpty, "Kafka source has a fixed schema and cannot be 
set with a custom one")
+if(schema.isDefined) {
--- End diff --

Why need this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org