Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20554#discussion_r167811474 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -28,50 +28,40 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} -import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources._ -import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport} import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType /** - * The provider class for the [[KafkaSource]]. This provider is designed such that it throws + * The provider class for all Kafka readers and writers. It is designed such that it throws * IllegalArgumentException when the Kafka Dataset is created, so that it can catch * missing options even before the query is started. */ private[kafka010] class KafkaSourceProvider extends DataSourceRegister - with StreamSourceProvider with StreamSinkProvider with RelationProvider with CreatableRelationProvider with StreamWriteSupport with ContinuousReadSupport + with MicroBatchReadSupport with Logging { import KafkaSourceProvider._ override def shortName(): String = "kafka" /** - * Returns the name and schema of the source. In addition, it also verifies whether the options - * are correct and sufficient to create the [[KafkaSource]] when the query is started. + * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches + * of Kafka data in a micro-batch streaming query. */ - override def sourceSchema( - sqlContext: SQLContext, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): (String, StructType) = { - validateStreamOptions(parameters) - require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") - (shortName(), KafkaOffsetReader.kafkaSchema) - } - - override def createSource( - sqlContext: SQLContext, + def createMicroBatchReader( --- End diff -- nit: `override`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org