Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/14026#discussion_r70158281
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
---
@@ -79,8 +81,71 @@ private case class Subscribe[K, V](
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]):
Consumer[K, V] = {
val consumer = new KafkaConsumer[K, V](kafkaParams)
consumer.subscribe(topics)
- if (currentOffsets.isEmpty) {
- offsets.asScala.foreach { case (topicPartition, offset) =>
+ val toSeek = if (currentOffsets.isEmpty) {
+ offsets
+ } else {
+ currentOffsets
--- End diff --
guess it wasnt designed for rapidly seeking back and forth. so to confirm
my understanding, these are the following calls in Subscribe
- poll - to get the initial assignments. handles none case of no committed
offset
- seek - set the position to whatever is known from user-provided offsets
or current offsets, so that later regular polls in the DStream can proceed with
as little difficulty as possible.
right?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]