gaborgsomogyi commented on a change in pull request #22138: [SPARK-25151][SS]
Apply Apache Commons Pool to KafkaDataConsumer
URL: https://github.com/apache/spark/pull/22138#discussion_r302980053
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
##########
@@ -421,17 +478,44 @@ private[kafka010] case class InternalKafkaConsumer(
}
}
- /** Create a new consumer and reset cached states */
- private def resetConsumer(): Unit = {
- consumer.close()
- consumer = createConsumer
- fetchedData.reset()
+ /**
+ * Poll messages from Kafka starting from `offset` and update `fetchedData`.
`fetchedData` may be
+ * empty if the Kafka consumer fetches some messages but all of them are not
visible messages
+ * (either transaction messages, or aborted messages when `isolation.level`
is `read_committed`).
+ *
+ * @throws OffsetOutOfRangeException if `offset` is out of range.
+ * @throws TimeoutException if the consumer position is not changed after
polling. It means the
+ * consumer polls nothing before timeout.
+ */
+ private def fetchData(consumer: InternalKafkaConsumer, fetchedData:
FetchedData, offset: Long,
+ pollTimeoutMs: Long): Unit = {
+ val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+ fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+ }
+
+ private def getOrRetrieveConsumer(): InternalKafkaConsumer = _consumer match
{
+ case None =>
+ _consumer = Option(consumerPool.borrowObject(cacheKey, kafkaParams))
+ require(_consumer.isDefined, "borrowing consumer from pool must always
succeed.")
+ _consumer.get
+
+ case Some(consumer) => consumer
+ }
+
+ private def getOrRetrieveFetchedData(offset: Long): FetchedData =
_fetchedData match {
+ case None =>
+ _fetchedData = Option(fetchedDataPool.acquire(cacheKey, offset))
+ require(_fetchedData.isDefined, "acquiring fetched data from cache must
always succeed.")
+ _fetchedData.get
+
+ case Some(fetchedData) => fetchedData
}
/**
* Return an addition message including useful message and instruction.
*/
- private def additionalMessage(failOnDataLoss: Boolean): String = {
+ private def additionalMessage(topicPartition: TopicPartition, groupId:
String,
+ failOnDataLoss: Boolean): String = {
Review comment:
Nit: indent
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]