Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167811474
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -28,50 +28,40 @@ import 
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
     
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SparkSession, SQLContext}
    -import org.apache.spark.sql.execution.streaming.{Sink, Source}
    +import org.apache.spark.sql.execution.streaming.Sink
     import org.apache.spark.sql.sources._
    -import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
     import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
     import org.apache.spark.sql.streaming.OutputMode
     import org.apache.spark.sql.types.StructType
     
     /**
    - * The provider class for the [[KafkaSource]]. This provider is designed 
such that it throws
    + * The provider class for all Kafka readers and writers. It is designed 
such that it throws
      * IllegalArgumentException when the Kafka Dataset is created, so that it 
can catch
      * missing options even before the query is started.
      */
     private[kafka010] class KafkaSourceProvider extends DataSourceRegister
    -    with StreamSourceProvider
         with StreamSinkProvider
         with RelationProvider
         with CreatableRelationProvider
         with StreamWriteSupport
         with ContinuousReadSupport
    +    with MicroBatchReadSupport
         with Logging {
       import KafkaSourceProvider._
     
       override def shortName(): String = "kafka"
     
       /**
    -   * Returns the name and schema of the source. In addition, it also 
verifies whether the options
    -   * are correct and sufficient to create the [[KafkaSource]] when the 
query is started.
    +   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read 
batches
    +   * of Kafka data in a micro-batch streaming query.
        */
    -  override def sourceSchema(
    -      sqlContext: SQLContext,
    -      schema: Option[StructType],
    -      providerName: String,
    -      parameters: Map[String, String]): (String, StructType) = {
    -    validateStreamOptions(parameters)
    -    require(schema.isEmpty, "Kafka source has a fixed schema and cannot be 
set with a custom one")
    -    (shortName(), KafkaOffsetReader.kafkaSchema)
    -  }
    -
    -  override def createSource(
    -      sqlContext: SQLContext,
    +  def createMicroBatchReader(
    --- End diff --
    
    nit: `override`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to