Github user koeninger commented on the issue:
https://github.com/apache/spark/pull/21690
@yuanboliu From reading KafkaConsumer code, and from testing, I don't see
where consumer.position() alone would un-pause topicpartitions. See below.
Can you give a counter-example?
I am seeing poll() reset the paused state. When you are having the
problem, are you seeing the info level log messages "poll(0) returned messages"?
If that's what's happening, I think the best we can do is call pause() in
only one place, the first line of paranoidPoll, e.g.
`c.pause(c.assignment)
val msgs = c.poll(0)
`
Here's what I saw in testing:
`scala> c.paused
res34: java.util.Set[org.apache.kafka.common.TopicPartition] = []
scala> c.assignment
res35: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
scala> c.pause(topics)
scala> c.paused
res37: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
scala> c.position(tp)
res38: Long = 248
scala> c.paused
res39: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
scala> c.poll(0)
res40: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] =
org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe
scala> c.paused
res41: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
scala> c.position(tp)
res42: Long = 248
scala> c.paused
res43: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
scala> c.poll(1)
res44: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] =
org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe
scala> c.paused
res45: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
scala> c.poll(100)
res46: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] =
org.apache.kafka.clients.consumer.ConsumerRecords@28e4439b
scala> c.paused
res47: java.util.Set[org.apache.kafka.common.TopicPartition] = []
`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]