[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-07-30 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400533#comment-15400533
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 7/30/16 8:15 AM:
-

Now that the major functions of the Kinesis connector for 1.1 is resolved, and 
the remaining sub-tasks of this JIRA isn't planned to be worked on or merged in 
the upcoming 1.1 release, I'd like to move out the remaining sub-tasks as 
standalone issues and close this JIRA once 1.1 release is confirmed.


was (Author: tzulitai):
Now that the major functions of the Kinesis connector is resolved, and the 
remaining sub-tasks of this JIRA isn't planned to be worked on or merged in the 
upcoming 1.1 release, I'd like to move out the remaining sub-tasks as 
standalone issues and close this JIRA.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243988#comment-15243988
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 4/16/16 3:53 AM:
-

https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.

In the description of https://issues.apache.org/jira/browse/FLINK-3229 is 
example code of how to use the consumer.

I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.


was (Author: tzulitai):
https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206770#comment-15206770
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 3/22/16 4:52 PM:
-

No problem :) I'll have it ready soon, hopefully by the end of March.


was (Author: tzulitai):
No problem :) I'll have it ready soon.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095796#comment-15095796
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/13/16 8:11 AM:
-

Very happy to help out and contribute!


was (Author: tzulitai):
Very happy to help out and contribute :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095795#comment-15095795
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/13/16 9:50 AM:
-

Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229
I'll be posting any future consumer-specific comments in the sub-task.


was (Author: tzulitai):
Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229
I'll post any future consumer-specific comments in the sub-task.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095795#comment-15095795
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/13/16 9:49 AM:
-

Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229
I'll post any future consumer-specific comments in the sub-task.


was (Author: tzulitai):
Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:30 PM:
-

Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shard infos to a Worker instance has 
private access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually still a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. A lot of 
parts can be based on the Kafka connector implementation.
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

Ref:
[1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4


was (Author: tzulitai):
Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 12:03 PM:
--

Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually still a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. A lot of 
parts can be based on the Kafka connector implementation.
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

Ref:
[1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4


was (Author: tzulitai):
Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:56 PM:
-

I'm currently in-progress of the work, should be ready to open a pull request 
within a week.


was (Author: tzulitai):
I'm currently in-progress of the work, should be able to open a pull request 
within a week.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:56 PM:
-

I'm currently in-progress of the work, should be able to open a pull request 
within a week.


was (Author: tzulitai):
I'm currently in-progress of the work, shoulde be able to open a pull request 
within a week.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 10:42 AM:
--

Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually still a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. 
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

Ref:
[1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4


was (Author: tzulitai):
Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090928#comment-15090928
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 6:40 AM:
-

A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region, also that they all have identical access settings 
(i.e., can all be accessed with the same single AWS credential).


was (Author: tzulitai):
A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning