Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/21690
  
    @yuanboliu From reading KafkaConsumer code, and from testing, I don't see 
where consumer.position() alone would un-pause topicpartitions.  See below.  
Can you give a counter-example?
    
    I am seeing poll() reset the paused state.  When you are having the 
problem, are you seeing the info level log messages "poll(0) returned messages"?
    
    If that's what's happening, I think the best we can do is call pause() in 
only one place, the first line of paranoidPoll, e.g.
    
    `c.pause(c.assignment)
    val msgs = c.poll(0)
    `
    
    
    
    Here's what I saw in testing:
    
    `scala> c.paused
    res34: java.util.Set[org.apache.kafka.common.TopicPartition] = []
    
    scala> c.assignment
    res35: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
    
    scala> c.pause(topics)
    
    scala> c.paused
    res37: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
    
    scala> c.position(tp)
    res38: Long = 248
    
    scala> c.paused
    res39: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
    
    scala> c.poll(0)
    res40: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = 
org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe
    
    scala> c.paused
    res41: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
    
    scala> c.position(tp)
    res42: Long = 248
    
    scala> c.paused
    res43: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
    
    scala> c.poll(1)
    res44: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = 
org.apache.kafka.clients.consumer.ConsumerRecords@20d7efbe
    
    scala> c.paused
    res45: java.util.Set[org.apache.kafka.common.TopicPartition] = [test-0]
    
    scala> c.poll(100)
    res46: org.apache.kafka.clients.consumer.ConsumerRecords[String,String] = 
org.apache.kafka.clients.consumer.ConsumerRecords@28e4439b
    
    scala> c.paused
    res47: java.util.Set[org.apache.kafka.common.TopicPartition] = []
    `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to