Github user sidhavratha commented on the issue:

    https://github.com/apache/spark/pull/21685
  
    Our kafka team have resolved issue regarding 40 sec poll delay, due to some 
faulty hardware.
    However, these changes still make sense to get better throughput per batch. 
As you know kafka already pre-fetches records, however it pre-fetch only one 
additional poll. If a batch requires number of records which takes many polls 
to fetch, processing will be blocked on kafka poll.
    
    This blocking kafka poll can be avoided based on configuration, and better 
throughput can be acheived, if we maintain configured buffer on executors.
    
    Attaching 2 files
    kafka_streaming_without_buffer.pdf : Streaming job without buffer is able 
to achieve ~50K throughput per batch
    kafka_streaming_with_buffer.pdf    : Streaming job with buffer is able to 
maintain ~70K throughput per batch
    
    For my test job
        Batch duration = 10 sec
        Kafka Topic Partitions = 15
        Cores = 5
        Average Processing time per record = 0.7 ms
    
        Hence, 
        Expected number of records per batch = (Batch duration * Kafka topic 
partitions) / (Ceil (Kafka partitions / Cores) * Time per record)
                = (10 * 1000 * 15) / (Ceil(15/5) * 0.7) = 100 * 1000 * (15/3) * 
(1/7) = 100000 * (5/7) = 71428.57
        Expected number of records per batch = 71428.57 = ~71K
    As per calculation ~71K is maximum throughput that can be achieved. Without 
buffer throughput is less since some time is wasted on blocked kafka poll.
    
    In this PR I am making kafka polling strategy to be configurable so that it 
is possible to plugin external strategy as well.
    Class name to create kafka consumer can be defined as 
"spark.streaming.kafka.consumer.builder.name" parameter.
    
    
    Few points which you have asked last time :
    
    *   Tests with recievers : This change is not applicable for recievers. 
Changes are only for direct approach.
    *   Guarantees when driver dies : I have tested with the scenario when 
driver dies. Since new started driver starts from last committed offset it 
works as expected and does not loose any data.
    *   Message size : Yes, pervious issue (40 sec poll) was more appearant 
when message size were big > 10KB. However, since "max.partition.fetch.bytes" 
is fixed as 1MB it should not be causing more poll time, since it have to carry 
same number of bytes irrespective of per record size. (However, I am not sure 
of Kafka server internals regarding this).
    *   Testing on 2.4.0 cluster : I am still working on creating 2.4.0 
cluster. Will update test result on that setup as well.
    
    
[kafka_streaming_without_buffer.pdf](https://github.com/apache/spark/files/2246522/kafka_streaming_without_buffer.pdf)
    
[kafka_streaming_with_buffer.pdf](https://github.com/apache/spark/files/2246523/kafka_streaming_with_buffer.pdf)



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to