Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20698#discussion_r171732729
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
    @@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader(
     }
     
     /** A [[DataReaderFactory]] for reading Kafka data in a micro-batch 
streaming query. */
    -private[kafka010] class KafkaMicroBatchDataReaderFactory(
    -    range: KafkaOffsetRange,
    -    preferredLoc: Option[String],
    +private[kafka010] case class KafkaMicroBatchDataReaderFactory(
    +    offsetRange: KafkaOffsetRange,
         executorKafkaParams: ju.Map[String, Object],
         pollTimeoutMs: Long,
    -    failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
    +    failOnDataLoss: Boolean,
    +    reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
     
    -  override def preferredLocations(): Array[String] = preferredLoc.toArray
    +  override def preferredLocations(): Array[String] = 
offsetRange.preferredLoc.toArray
     
       override def createDataReader(): DataReader[UnsafeRow] = new 
KafkaMicroBatchDataReader(
    -    range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
    +    offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, 
reuseKafkaConsumer)
     }
     
     /** A [[DataReader]] for reading Kafka data in a micro-batch streaming 
query. */
    -private[kafka010] class KafkaMicroBatchDataReader(
    +private[kafka010] case class KafkaMicroBatchDataReader(
         offsetRange: KafkaOffsetRange,
         executorKafkaParams: ju.Map[String, Object],
         pollTimeoutMs: Long,
    -    failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
    +    failOnDataLoss: Boolean,
    +    reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with 
Logging {
    +
    +  private val consumer = {
    +    if (!reuseKafkaConsumer) {
    +      // If we can't reuse CachedKafkaConsumers, creating a new 
CachedKafkaConsumer. As here we
    --- End diff --
    
    `nit: We use 'assign' here, hence don't need to ...`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to