Github user zsxwing commented on a diff in the pull request:
    --- Diff: 
    @@ -28,50 +28,40 @@ import 
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SparkSession, SQLContext}
    -import org.apache.spark.sql.execution.streaming.{Sink, Source}
    +import org.apache.spark.sql.execution.streaming.Sink
     import org.apache.spark.sql.sources._
    -import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, StreamWriteSupport}
    +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
     import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
     import org.apache.spark.sql.streaming.OutputMode
     import org.apache.spark.sql.types.StructType
    - * The provider class for the [[KafkaSource]]. This provider is designed 
such that it throws
    + * The provider class for all Kafka readers and writers. It is designed 
such that it throws
      * IllegalArgumentException when the Kafka Dataset is created, so that it 
can catch
      * missing options even before the query is started.
     private[kafka010] class KafkaSourceProvider extends DataSourceRegister
    -    with StreamSourceProvider
         with StreamSinkProvider
         with RelationProvider
         with CreatableRelationProvider
         with StreamWriteSupport
         with ContinuousReadSupport
    +    with MicroBatchReadSupport
         with Logging {
       import KafkaSourceProvider._
       override def shortName(): String = "kafka"
    -   * Returns the name and schema of the source. In addition, it also 
verifies whether the options
    -   * are correct and sufficient to create the [[KafkaSource]] when the 
query is started.
    +   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read 
    +   * of Kafka data in a micro-batch streaming query.
    -  override def sourceSchema(
    -      sqlContext: SQLContext,
    -      schema: Option[StructType],
    -      providerName: String,
    -      parameters: Map[String, String]): (String, StructType) = {
    -    validateStreamOptions(parameters)
    -    require(schema.isEmpty, "Kafka source has a fixed schema and cannot be 
set with a custom one")
    -    (shortName(), KafkaOffsetReader.kafkaSchema)
    -  }
    -  override def createSource(
    -      sqlContext: SQLContext,
    +  def createMicroBatchReader(
    --- End diff --
    nit: `override`


To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to