[
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai updated FLINK-4080:
---
Description:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several
tries.
Kinesis records of the same aggregated batch will have the same sequence
number, and different sub-sequence numbers
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
The current code of the consumer is committing state every time it finishes
processing a record, even de-aggregated ones. This is a bug since this will
incorrectly mark all remaining records of the de-aggregated batch as processed
in the state.
Proposed fix:
1. Use the extended `UserRecord` class in KCL to represent all records (either
non- or de-aggregated) instead of the basic `Record` class. This gives access
to whether or not the record was originally aggregated.
2. The sequence number state we are checkpointing needs to be able to indicate
that the last seen sequence number of a shard may be a de-aggregated shard,
i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record of the
5th record was last seen for shard 0. On restore, we start again from record 5
for shard 0 and skip the first 7 sub-records; however, for shard 1 we start
from record 3 since record 2 is non-aggregated and already fully processed.
was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several
tries.
Kinesis records of the same aggregated batch will have the same sequence
number, and different sub-sequence numbers
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
The current code of the consumer is committing state every time it finishes
processing a record, even de-aggregated ones. This is a bug since this will
incorrectly mark all remaining records of the de-aggregated batch as processed
in the state.
Proposed fix: Use the extended `UserRecord` class in KCL to represent all
records (either non- or de-aggregated) instead of the basic `Record` class.
This gives access to whether or not the record was originally aggregated. If we
encounter a de-aggregated record, don't update state until we finished
processing the last record of the batch.
> Kinesis consumer not exactly-once if stopped in the middle of processing
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
> Issue Type: Sub-task
> Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence
> number, and different sub-sequence numbers
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
> The current code of the consumer is committing state every time it finishes
> processing a record, even de-aggregated ones. This is a bug since this will
> incorrectly mark all remaining records of the de-aggregated batch as
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records
> (either non- or de-aggregated) instead of the basic `Record` class. This
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to
> indicate that the last seen sequence number of a shard may be a de-aggregated
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record
> of the 5th record was last seen for shard 0. On restore, we start again from
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1
> we start from record 3 since record 2 is non-aggregated and already fully
> processed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)