Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/22138#discussion_r215579562
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -18,222 +18,247 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
+import java.io.Closeable
import java.util.concurrent.TimeoutException
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord,
KafkaConsumer, OffsetOutOfRangeException}
import org.apache.kafka.common.TopicPartition
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange,
CacheKey, UNKNOWN_OFFSET}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from
multiple callers,
+ * so all the methods should not rely on current cursor and use seek
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+ val topicPartition: TopicPartition,
+ val kafkaParams: ju.Map[String, Object]) extends Closeable with
Logging {
+
+ val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+ private val consumer = createConsumer
-private[kafka010] sealed trait KafkaDataConsumer {
/**
- * Get the record for the given offset if available.
- *
- * If the record is invisible (either a
- * transaction message, or an aborted message when the consumer's
`isolation.level` is
- * `read_committed`), it will be skipped and this method will try to
fetch next available record
- * within [offset, untilOffset).
- *
- * This method also will try its best to detect data loss. If
`failOnDataLoss` is `true`, it will
- * throw an exception when we detect an unavailable offset. If
`failOnDataLoss` is `false`, this
- * method will try to fetch next available record within [offset,
untilOffset).
- *
- * When this method tries to skip offsets due to either invisible
messages or data loss and
- * reaches `untilOffset`, it will return `null`.
+ * Poll messages from Kafka starting from `offset` and returns a pair of
"list of consumer record"
+ * and "offset after poll". The list of consumer record may be empty if
the Kafka consumer fetches
+ * some messages but all of them are not visible messages (either
transaction messages,
+ * or aborted messages when `isolation.level` is `read_committed`).
*
- * @param offset the offset to fetch.
- * @param untilOffset the max offset to fetch. Exclusive.
- * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
- * @param failOnDataLoss When `failOnDataLoss` is `true`, this method
will either return record at
- * offset if available, or throw exception.when
`failOnDataLoss` is `false`,
- * this method will either return record at offset
if available, or return
- * the next earliest available record less than
untilOffset, or null. It
- * will not throw any exception.
+ * @throws OffsetOutOfRangeException if `offset` is out of range.
+ * @throws TimeoutException if the consumer position is not changed
after polling. It means the
+ * consumer polls nothing before timeout.
*/
- def get(
- offset: Long,
- untilOffset: Long,
- pollTimeoutMs: Long,
- failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] =
{
- internalConsumer.get(offset, untilOffset, pollTimeoutMs,
failOnDataLoss)
+ def fetch(offset: Long, pollTimeoutMs: Long)
+ : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+ // Seek to the offset because we may call seekToBeginning or seekToEnd
before this.
+ seek(offset)
+ val p = consumer.poll(pollTimeoutMs)
+ val r = p.records(topicPartition)
+ logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
+ val offsetAfterPoll = consumer.position(topicPartition)
+ logDebug(s"Offset changed from $offset to $offsetAfterPoll after
polling")
+ val fetchedData = (r, offsetAfterPoll)
+ if (r.isEmpty) {
+ // We cannot fetch anything after `poll`. Two possible cases:
+ // - `offset` is out of range so that Kafka returns nothing.
`OffsetOutOfRangeException` will
+ // be thrown.
+ // - Cannot fetch any data before timeout. `TimeoutException` will
be thrown.
+ // - Fetched something but all of them are not invisible. This is a
valid case and let the
+ // caller handles this.
+ val range = getAvailableOffsetRange()
+ if (offset < range.earliest || offset >= range.latest) {
+ throw new OffsetOutOfRangeException(
+ Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+ } else if (offset == offsetAfterPoll) {
+ throw new TimeoutException(
+ s"Cannot fetch record for offset $offset in $pollTimeoutMs
milliseconds")
+ }
+ }
+ fetchedData
}
/**
* Return the available offset range of the current partition. It's a
pair of the earliest offset
* and the latest offset.
*/
- def getAvailableOffsetRange(): AvailableOffsetRange =
internalConsumer.getAvailableOffsetRange()
+ def getAvailableOffsetRange(): AvailableOffsetRange = {
+ consumer.seekToBeginning(Set(topicPartition).asJava)
+ val earliestOffset = consumer.position(topicPartition)
+ consumer.seekToEnd(Set(topicPartition).asJava)
+ val latestOffset = consumer.position(topicPartition)
+ AvailableOffsetRange(earliestOffset, latestOffset)
+ }
- /**
- * Release this consumer from being further used. Depending on its
implementation,
- * this consumer will be either finalized, or reset for reuse later.
- */
- def release(): Unit
+ override def close(): Unit = {
+ consumer.close()
+ }
- /** Reference to the internal implementation that this wrapper delegates
to */
- protected def internalConsumer: InternalKafkaConsumer
-}
+ /** Create a KafkaConsumer to fetch records for `topicPartition` */
+ private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
+ val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
+ val tps = new ju.ArrayList[TopicPartition]()
+ tps.add(topicPartition)
+ c.assign(tps)
+ c
+ }
+ private def seek(offset: Long): Unit = {
+ logDebug(s"Seeking to $groupId $topicPartition $offset")
+ consumer.seek(topicPartition, offset)
+ }
+}
/**
- * A wrapper around Kafka's KafkaConsumer that throws error when data loss
is detected.
- * This is not for direct use outside this file.
+ * The internal object to store the fetched data from Kafka consumer and
the next offset to poll.
+ *
+ * @param _records the pre-fetched Kafka records.
+ * @param _nextOffsetInFetchedData the next offset in `records`. We use
this to verify if we
+ * should check if the pre-fetched data is
still valid.
+ * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will
use this offset to
+ * poll when `records` is drained.
*/
-private[kafka010] case class InternalKafkaConsumer(
- topicPartition: TopicPartition,
- kafkaParams: ju.Map[String, Object]) extends Logging {
- import InternalKafkaConsumer._
-
- /**
- * The internal object to store the fetched data from Kafka consumer and
the next offset to poll.
- *
- * @param _records the pre-fetched Kafka records.
- * @param _nextOffsetInFetchedData the next offset in `records`. We use
this to verify if we
- * should check if the pre-fetched data
is still valid.
- * @param _offsetAfterPoll the Kafka offset after calling `poll`. We
will use this offset to
- * poll when `records` is drained.
- */
- private case class FetchedData(
- private var _records: ju.ListIterator[ConsumerRecord[Array[Byte],
Array[Byte]]],
- private var _nextOffsetInFetchedData: Long,
- private var _offsetAfterPoll: Long) {
-
- def withNewPoll(
- records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
- offsetAfterPoll: Long): FetchedData = {
- this._records = records
- this._nextOffsetInFetchedData = UNKNOWN_OFFSET
- this._offsetAfterPoll = offsetAfterPoll
- this
- }
-
- /** Whether there are more elements */
- def hasNext: Boolean = _records.hasNext
-
- /** Move `records` forward and return the next record. */
- def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
- val record = _records.next()
- _nextOffsetInFetchedData = record.offset + 1
- record
- }
+private[kafka010] case class FetchedData(
--- End diff --
Now I see how the seeking issue solved, good. I think it's highly welcome
and essential in terms of performance which would be good to cover at least in
one test (made a toy test and in logs I see it works). Something like:
```
createTopicWithOnePartition
get(0)
assert seek
get(500)
assert seek
get(1)
assert no seek
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]