[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4080:
---
Description: 
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix:
1. Use the extended `UserRecord` class in KCL to represent all records (either 
non- or de-aggregated) instead of the basic `Record` class. This gives access 
to whether or not the record was originally aggregated.
2. The sequence number state we are checkpointing needs to be able to indicate 
that the last seen sequence number of a shard may be a de-aggregated shard, 
i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record of the 
5th record was last seen for shard 0. On restore, we start again from record 5 
for shard 0 and skip the first 7 sub-records; however, for shard 1 we start 
from record 3 since record 2 is non-aggregated and already fully processed.

  was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4080:
---
Description: 
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.

  was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current state of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
> records (either non- or de-aggregated) instead of the basic `Record` class. 
> This gives access to whether or not the record was originally aggregated. If 
> we encounter a de-aggregated record, don't update state until we finished 
> processing the last record of the batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)