Github user QuentinAmbard commented on the issue:

    https://github.com/apache/spark/pull/21917
  
    With this solution we don't read the data another time "just to support 
transaction."
    The current implementation of compacted topics already ready all the 
messages twice in order to get a correct count for the info tracker:     `val 
inputInfo = StreamInputInfo(id, rdd.count, metadata)`
    Since RDD are compacted or have "empty offset" due to transactions 
markers/abort, the only way to get the exact count is to read.
    The same logic applies to select a range: if you want to get N records per 
partition, the only way to know what "untilOffset" will make you read N records 
is to read, stop when you've read N records, and get the offset.
    So one of the advantage is to be able to fetch the number of records per 
partition you really want (for compacted topics and transactions).
    But the real advantage is that it let you pick an "untilOffset" that is not 
empty.
    For example if you don't have record for offset [4, 6]
    
    ```1, 2, 3, 4, 5, 6
    a, b, c,   ,   ,  ,```
    
    if use an offset range of [1,5), you will try to read 4 but won't receive 
any data. In this scenario you can't tell if data is missing (so it's ok) or 
you lose some data because kafka is down (not ok)
    
    To deal with this situation, we first scan the offset and stop to the last 
offset where we had data, in the example instead of [1,5) we would go with 
[1,4) because 3 has data so it's safe to stop at 3
    During the next batch, if we then get extra data we would then select the 
next range as [4, 8) and won't have any issue.
    ```1, 2, 3, 4, 5, 6, 7
    a, b, c,   ,   ,  , g```
    
    does that make sense?
    (PS: sorry for the multiple commit, I wasn't running Mima properly, I'll 
fix the remaining issue soon)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to