Github user sidhavratha commented on the issue: https://github.com/apache/spark/pull/21685 Our kafka team have resolved issue regarding 40 sec poll delay, due to some faulty hardware. However, these changes still make sense to get better throughput per batch. As you know kafka already pre-fetches records, however it pre-fetch only one additional poll. If a batch requires number of records which takes many polls to fetch, processing will be blocked on kafka poll. This blocking kafka poll can be avoided based on configuration, and better throughput can be acheived, if we maintain configured buffer on executors. Attaching 2 files kafka_streaming_without_buffer.pdf : Streaming job without buffer is able to achieve ~50K throughput per batch kafka_streaming_with_buffer.pdf : Streaming job with buffer is able to maintain ~70K throughput per batch For my test job Batch duration = 10 sec Kafka Topic Partitions = 15 Cores = 5 Average Processing time per record = 0.7 ms Hence, Expected number of records per batch = (Batch duration * Kafka topic partitions) / (Ceil (Kafka partitions / Cores) * Time per record) = (10 * 1000 * 15) / (Ceil(15/5) * 0.7) = 100 * 1000 * (15/3) * (1/7) = 100000 * (5/7) = 71428.57 Expected number of records per batch = 71428.57 = ~71K As per calculation ~71K is maximum throughput that can be achieved. Without buffer throughput is less since some time is wasted on blocked kafka poll. In this PR I am making kafka polling strategy to be configurable so that it is possible to plugin external strategy as well. Class name to create kafka consumer can be defined as "spark.streaming.kafka.consumer.builder.name" parameter. Few points which you have asked last time : * Tests with recievers : This change is not applicable for recievers. Changes are only for direct approach. * Guarantees when driver dies : I have tested with the scenario when driver dies. Since new started driver starts from last committed offset it works as expected and does not loose any data. * Message size : Yes, pervious issue (40 sec poll) was more appearant when message size were big > 10KB. However, since "max.partition.fetch.bytes" is fixed as 1MB it should not be causing more poll time, since it have to carry same number of bytes irrespective of per record size. (However, I am not sure of Kafka server internals regarding this). * Testing on 2.4.0 cluster : I am still working on creating 2.4.0 cluster. Will update test result on that setup as well. [kafka_streaming_without_buffer.pdf](https://github.com/apache/spark/files/2246522/kafka_streaming_without_buffer.pdf) [kafka_streaming_with_buffer.pdf](https://github.com/apache/spark/files/2246523/kafka_streaming_with_buffer.pdf)
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org