Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20554#discussion_r167811474
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
---
@@ -28,50 +28,40 @@ import
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode,
SparkSession, SQLContext}
-import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport,
DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport,
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
/**
- * The provider class for the [[KafkaSource]]. This provider is designed
such that it throws
+ * The provider class for all Kafka readers and writers. It is designed
such that it throws
* IllegalArgumentException when the Kafka Dataset is created, so that it
can catch
* missing options even before the query is started.
*/
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
- with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
with CreatableRelationProvider
with StreamWriteSupport
with ContinuousReadSupport
+ with MicroBatchReadSupport
with Logging {
import KafkaSourceProvider._
override def shortName(): String = "kafka"
/**
- * Returns the name and schema of the source. In addition, it also
verifies whether the options
- * are correct and sufficient to create the [[KafkaSource]] when the
query is started.
+ * Creates a
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read
batches
+ * of Kafka data in a micro-batch streaming query.
*/
- override def sourceSchema(
- sqlContext: SQLContext,
- schema: Option[StructType],
- providerName: String,
- parameters: Map[String, String]): (String, StructType) = {
- validateStreamOptions(parameters)
- require(schema.isEmpty, "Kafka source has a fixed schema and cannot be
set with a custom one")
- (shortName(), KafkaOffsetReader.kafkaSchema)
- }
-
- override def createSource(
- sqlContext: SQLContext,
+ def createMicroBatchReader(
--- End diff --
nit: `override`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]