HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303804331
########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ########## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { + var partitions = Set.empty[TopicPartition].asJava + val startTimeMs = System.currentTimeMillis() + while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: Please correct me if I'm missing here. (Not an expert of Kafka, may miss some details.) It may return after 100ms if there's no record to consume even metadata is ready. Here we only need metadata but once we call poll, the request is bound to the records. To be clear, we would like to call `poll(0)` with explicitly putting sleep (to avoid coupling with records), but it would be also OK to let `consumer.poll` wait instead. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org