gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-512703520 @zsxwing the main approach is created in agreement with the Kafka guys. The only thing which bothers me is why producing the following approaches different results (maybe an edge case hit with 0?!). Works: ``` ... consumer.poll(jt.Duration.ZERO) var partitions = consumer.assignment() while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { consumer.poll(jt.Duration.ofMillis(100)) partitions = consumer.assignment() } ... ``` Fails consistently and no flakyness (worth to mention only couple of tests, so maybe tests are wrongly implemented): ``` ... var partitions = Set.empty[TopicPartition].asJava while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { consumer.poll(jt.Duration.ZERO) partitions = consumer.assignment() if (partitions.isEmpty) { Thread.sleep(100) } } ... ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
