Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/20698#discussion_r171732729
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
---
@@ -320,28 +300,39 @@ private[kafka010] class KafkaMicroBatchReader(
}
/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch
streaming query. */
-private[kafka010] class KafkaMicroBatchDataReaderFactory(
- range: KafkaOffsetRange,
- preferredLoc: Option[String],
+private[kafka010] case class KafkaMicroBatchDataReaderFactory(
+ offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
- failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
+ failOnDataLoss: Boolean,
+ reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
- override def preferredLocations(): Array[String] = preferredLoc.toArray
+ override def preferredLocations(): Array[String] =
offsetRange.preferredLoc.toArray
override def createDataReader(): DataReader[UnsafeRow] = new
KafkaMicroBatchDataReader(
- range, executorKafkaParams, pollTimeoutMs, failOnDataLoss)
+ offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss,
reuseKafkaConsumer)
}
/** A [[DataReader]] for reading Kafka data in a micro-batch streaming
query. */
-private[kafka010] class KafkaMicroBatchDataReader(
+private[kafka010] case class KafkaMicroBatchDataReader(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
- failOnDataLoss: Boolean) extends DataReader[UnsafeRow] with Logging {
+ failOnDataLoss: Boolean,
+ reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with
Logging {
+
+ private val consumer = {
+ if (!reuseKafkaConsumer) {
+ // If we can't reuse CachedKafkaConsumers, creating a new
CachedKafkaConsumer. As here we
--- End diff --
`nit: We use 'assign' here, hence don't need to ...`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]