[jira] [Commented] (HUDI-292) Consume more entries from kafka than specified sourceLimit.
[ https://issues.apache.org/jira/browse/HUDI-292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947005#comment-16947005 ] Vinoth Chandar commented on HUDI-292: - sg. thanks for being awesome [~xleesf] :) > Consume more entries from kafka than specified sourceLimit. > --- > > Key: HUDI-292 > URL: https://issues.apache.org/jira/browse/HUDI-292 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Utilities >Reporter: leesf >Assignee: leesf >Priority: Major > Fix For: 0.5.1 > > > When _CheckpointUtils#computeOffsetRanges_ for consuming kafka messges. > Given > topic = "test", > fromOffsets(partition -> offset pair) = (0 -> 0), (1 -> 0), (2 -> 0), (3 -> > 0), (4 -> 0), > toOffsets = (0, 100), (1, 1000), (2, 1000), (3, 1000), (4, 1000), > numEvents = 1001. > The output of _CheckpointUtils#computesOffsetRanges_ is > OffsetRange(topic: 'test', partition: 0, range: [0 -> 100]) > OffsetRange(topic: 'test', partition: 1, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 2, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 3, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 4, range: [0 -> 226]) > Total count is 1004(100 + 266 * 4), more than 1001, and thus consume more > entries from kafka than specified 1001. > CC [~vinoth] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (HUDI-292) Consume more entries from kafka than specified sourceLimit.
[ https://issues.apache.org/jira/browse/HUDI-292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16946383#comment-16946383 ] leesf commented on HUDI-292: `long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);` to compute the offset is well, but we should handle the case in which remainingEvents is less than `toOffset - range.untilOffset()`. Also it may not affect so much even consume more entries from partial partitions, but we had better to fix it. And i would like to open a PR to fix it. CC [~vinoth] > Consume more entries from kafka than specified sourceLimit. > --- > > Key: HUDI-292 > URL: https://issues.apache.org/jira/browse/HUDI-292 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Utilities >Reporter: leesf >Assignee: leesf >Priority: Major > Fix For: 0.5.1 > > > When _CheckpointUtils#computeOffsetRanges_ for consuming kafka messges. > Given > topic = "test", > fromOffsets(partition -> offset pair) = (0 -> 0), (1 -> 0), (2 -> 0), (3 -> > 0), (4 -> 0), > toOffsets = (0, 100), (1, 1000), (2, 1000), (3, 1000), (4, 1000), > numEvents = 1001. > The output of _CheckpointUtils#computesOffsetRanges_ is > OffsetRange(topic: 'test', partition: 0, range: [0 -> 100]) > OffsetRange(topic: 'test', partition: 1, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 2, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 3, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 4, range: [0 -> 226]) > Total count is 1004(100 + 266 * 4), more than 1001, and thus consume more > entries from kafka than specified 1001. > CC [~vinoth] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (HUDI-292) Consume more entries from kafka than specified sourceLimit.
[ https://issues.apache.org/jira/browse/HUDI-292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16945994#comment-16945994 ] Vinoth Chandar commented on HUDI-292: - is it because `long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition);` does not honor the `remainingEvents`? Good catch. but wondering how severe can the actually effect be since we bound by eventsPerPartition anywya? > Consume more entries from kafka than specified sourceLimit. > --- > > Key: HUDI-292 > URL: https://issues.apache.org/jira/browse/HUDI-292 > Project: Apache Hudi (incubating) > Issue Type: Improvement > Components: Utilities >Reporter: leesf >Assignee: leesf >Priority: Major > Fix For: 0.5.1 > > > When _CheckpointUtils#computeOffsetRanges_ for consuming kafka messges. > Given > topic = "test", > fromOffsets(partition -> offset pair) = (0 -> 0), (1 -> 0), (2 -> 0), (3 -> > 0), (4 -> 0), > toOffsets = (0, 100), (1, 1000), (2, 1000), (3, 1000), (4, 1000), > numEvents = 1001. > The output of _CheckpointUtils#computesOffsetRanges_ is > OffsetRange(topic: 'test', partition: 0, range: [0 -> 100]) > OffsetRange(topic: 'test', partition: 1, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 2, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 3, range: [0 -> 226]) > OffsetRange(topic: 'test', partition: 4, range: [0 -> 226]) > Total count is 1004(100 + 266 * 4), more than 1001, and thus consume more > entries from kafka than specified 1001. > CC [~vinoth] -- This message was sent by Atlassian Jira (v8.3.4#803005)