HeartSaVioR commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-514048478
 
 
   Here's a part of test code Kafka has been doing with new poll.
   
   
https://github.com/apache/kafka/blob/f98e176746d663fadedbcd3c18312a7f476a20c8/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1748-L1752
   
   ```
     private def awaitAssignment(consumer: Consumer[_, _], expectedAssignment: 
Set[TopicPartition]): Unit = {
       TestUtils.pollUntilTrue(consumer, () => consumer.assignment() == 
expectedAssignment.asJava,
         s"Timed out while awaiting expected assignment $expectedAssignment. " +
           s"The current assignment is ${consumer.assignment()}")
     }
   ```
   
   
https://github.com/apache/kafka/blob/f98e176746d663fadedbcd3c18312a7f476a20c8/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L767-L775
   
   ```
     def pollUntilTrue(consumer: Consumer[_, _],
                       action: () => Boolean,
                       msg: => String,
                       waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit 
= {
       waitUntilTrue(() => {
         consumer.poll(Duration.ofMillis(50))
         action()
       }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
     }
   ```
   
   Kafka has still some parts of test code relying on deprecated `poll(0)` (so 
co-usage on both `poll(Duration)` and `poll(long)`). It might not be technical 
reason to do so, but they're still relying on old favor, which might mean they 
indicate the needs of usage on `poll(0)`.
   
   Sometimes Kafka calls `updateAssignmentMetadataIfNeeded` directly which 
deals with metadata update in `poll()` with max long timer, effectively 
blocking. The method is for testing: defined as package private.
   
   ```
   consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
   ```
   
   In many cases of calling `poll(Duration.ZERO)` in test code, 
`updateAssignmentMetadataIfNeeded` is called prior to the call. In other cases 
the verification codes just seem to confirm calling poll doing nothing or 
returning already fetched records.
   
   I guess in our case we need to either leverage 
`updateAssignmentMetadataIfNeeded` to only deal with metadata (it may require 
some hack and they clarified it's for testing so unsafe one), or `poll` with 
small timeout (50ms) with tolerating the case where record to pull is not 
available (incorporated in latency regardless of availability of metadata).
   
   Btw, I'm seeing KIP-288 to propose new public API `waitForAssignment` 
similar to `updateAssignmentMetadataIfNeeded` but KIP-288 was discarded since 
KIP-266 superseded KIP-288, and KIP-266 didn't finally add it. Not sure it is 
declined or just missed it.
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-288%3A+%5BDISCARDED%5D+Consumer.poll%28%29+timeout+semantic+change+and+new+waitForAssignment+method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to