Github user QuentinAmbard commented on the issue:
https://github.com/apache/spark/pull/21917
With this solution we don't read the data another time "just to support
transaction."
The current implementation of compacted topics already ready all the
messages twice in order to get a correct count for the info tracker: `val
inputInfo = StreamInputInfo(id, rdd.count, metadata)`
Since RDD are compacted or have "empty offset" due to transactions
markers/abort, the only way to get the exact count is to read.
The same logic applies to select a range: if you want to get N records per
partition, the only way to know what "untilOffset" will make you read N records
is to read, stop when you've read N records, and get the offset.
So one of the advantage is to be able to fetch the number of records per
partition you really want (for compacted topics and transactions).
But the real advantage is that it let you pick an "untilOffset" that is not
empty.
For example if you don't have record for offset [4, 6]
```1, 2, 3, 4, 5, 6
a, b, c, , , ,```
if use an offset range of [1,5), you will try to read 4 but won't receive
any data. In this scenario you can't tell if data is missing (so it's ok) or
you lose some data because kafka is down (not ok)
To deal with this situation, we first scan the offset and stop to the last
offset where we had data, in the example instead of [1,5) we would go with
[1,4) because 3 has data so it's safe to stop at 3
During the next batch, if we then get extra data we would then select the
next range as [4, 8) and won't have any issue.
```1, 2, 3, 4, 5, 6, 7
a, b, c, , , , g```
does that make sense?
(PS: sorry for the multiple commit, I wasn't running Mima properly, I'll
fix the remaining issue soon)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]