Github user ofir-manor commented on a diff in the pull request:
https://github.com/apache/spark/pull/15504#discussion_r83546052
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
---
@@ -232,6 +232,42 @@ private[kafka010] case class KafkaSource(
override def toString(): String = s"KafkaSource[$consumerStrategy]"
/**
+ * Set consumer position to specified offsets, making sure all
assignments are set.
+ */
+ private def fetchSpecificStartingOffsets(
+ partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition,
Long] =
+ withRetriesWithoutInterrupt {
+ // Poll to get the latest assigned partitions
+ consumer.poll(0)
+ val partitions = consumer.assignment()
+ consumer.pause(partitions)
+ assert(partitions.asScala == partitionOffsets.keySet,
+ "If startingOffsets contains specific offsets, you must specify
all TopicPartitions.\n" +
+ "Use -1 for latest, -2 for earliest, if you don't care.\n" +
--- End diff --
`<if you don't care>`... maybe replace with
`<You can also use -1 for latest, -2 for earliest>` - less judgmental
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]