Github user sidhavratha commented on the issue:
https://github.com/apache/spark/pull/21685
Our kafka team have resolved issue regarding 40 sec poll delay, due to some
faulty hardware.
However, these changes still make sense to get better throughput per batch.
As you know kafka already pre-fetches records, however it pre-fetch only one
additional poll. If a batch requires number of records which takes many polls
to fetch, processing will be blocked on kafka poll.
This blocking kafka poll can be avoided based on configuration, and better
throughput can be acheived, if we maintain configured buffer on executors.
Attaching 2 files
kafka_streaming_without_buffer.pdf : Streaming job without buffer is able
to achieve ~50K throughput per batch
kafka_streaming_with_buffer.pdf : Streaming job with buffer is able to
maintain ~70K throughput per batch
For my test job
Batch duration = 10 sec
Kafka Topic Partitions = 15
Cores = 5
Average Processing time per record = 0.7 ms
Hence,
Expected number of records per batch = (Batch duration * Kafka topic
partitions) / (Ceil (Kafka partitions / Cores) * Time per record)
= (10 * 1000 * 15) / (Ceil(15/5) * 0.7) = 100 * 1000 * (15/3) *
(1/7) = 100000 * (5/7) = 71428.57
Expected number of records per batch = 71428.57 = ~71K
As per calculation ~71K is maximum throughput that can be achieved. Without
buffer throughput is less since some time is wasted on blocked kafka poll.
In this PR I am making kafka polling strategy to be configurable so that it
is possible to plugin external strategy as well.
Class name to create kafka consumer can be defined as
"spark.streaming.kafka.consumer.builder.name" parameter.
Few points which you have asked last time :
* Tests with recievers : This change is not applicable for recievers.
Changes are only for direct approach.
* Guarantees when driver dies : I have tested with the scenario when
driver dies. Since new started driver starts from last committed offset it
works as expected and does not loose any data.
* Message size : Yes, pervious issue (40 sec poll) was more appearant
when message size were big > 10KB. However, since "max.partition.fetch.bytes"
is fixed as 1MB it should not be causing more poll time, since it have to carry
same number of bytes irrespective of per record size. (However, I am not sure
of Kafka server internals regarding this).
* Testing on 2.4.0 cluster : I am still working on creating 2.4.0
cluster. Will update test result on that setup as well.
[kafka_streaming_without_buffer.pdf](https://github.com/apache/spark/files/2246522/kafka_streaming_without_buffer.pdf)
[kafka_streaming_with_buffer.pdf](https://github.com/apache/spark/files/2246523/kafka_streaming_with_buffer.pdf)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]