Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22009#discussion_r211628439
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 ---
    @@ -47,70 +46,49 @@ import org.apache.spark.sql.types.StructType
      *                       scenarios, where some offsets after the specified 
initial ones can't be
      *                       properly read.
      */
    -class KafkaContinuousReader(
    +class KafkaContinuousReadSupport(
         offsetReader: KafkaOffsetReader,
         kafkaParams: ju.Map[String, Object],
         sourceOptions: Map[String, String],
         metadataPath: String,
         initialOffsets: KafkaOffsetRangeLimit,
         failOnDataLoss: Boolean)
    -  extends ContinuousReader with Logging {
    -
    -  private lazy val session = SparkSession.getActiveSession.get
    -  private lazy val sc = session.sparkContext
    +  extends ContinuousReadSupport with Logging {
     
       private val pollTimeoutMs = 
sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
     
    -  // Initialized when creating reader factories. If this diverges from the 
partitions at the latest
    -  // offsets, we need to reconfigure.
    -  // Exposed outside this object only for unit tests.
    -  @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
    --- End diff --
    
    Moved to `KafkaContinuousScanConfig`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to