[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-08-19 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15429114#comment-15429114
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Closing this now ...
Remaining issues will be moved to be standalone JIRAs.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-07-30 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15400533#comment-15400533
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Now that the major functions of the Kinesis connector is resolved, and the 
remaining sub-tasks of this JIRA isn't planned to be worked on or merged in the 
upcoming 1.1 release, I'd like to move out the remaining sub-tasks as 
standalone issues and close this JIRA.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305768#comment-15305768
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


I just noticed the conversation on PR #2016. Let's wait until decide on whether 
to drop the dependency ;)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305760#comment-15305760
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


[~rmetzger]
Hi Robert,
I'm back and ready to continue enhancing the Kinesis connector. Thank you for 
the merge on PR #1911.
Just a check before working on the new issues, are we still going going for the 
approach of removing both KPL & KCL from the connector and use basic SDK only? 
I have WIP for this: 
https://github.com/tzulitai/flink/tree/FLINK-3229-RemoveKclKplRework (not yet 
rebased on the recent changes)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15282581#comment-15282581
 ] 

Robert Metzger commented on FLINK-3211:
---

I agree with merging the kinesis connector from the #1911 pull request and then 
use follow up JIRAs and pull requests to further enhance it.

We can even release Flink 1.1, without providing a binary for Kinesis.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15282577#comment-15282577
 ] 

Robert Metzger commented on FLINK-3211:
---

I think we have to support record deaggregation. It can happen that we are 
consuming a stream that was produced by a system using the KPL.

I agree that the out-of-sync configuration is not nice currently. I think we 
should change the producer and make it like the consumer. Then, this is also 
similar to our kafka connector code.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-13 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15282573#comment-15282573
 ] 

Robert Metzger commented on FLINK-3211:
---

I use this code here: https://github.com/rmetzger/flink-kinesis-test
I build the kinesis connector, then the flink-kinesis-test project, then I 
start the data consumer and data generator


> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280969#comment-15280969
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


[~rmetzger]
Hi Robert,
Can you tell me how you were testing the Kinesis connector in AWS EMR before? 
I'd like to try it myself too when I get back, also to learn the process :) 
Thanks.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279737#comment-15279737
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


https://github.com/tzulitai/flink/tree/FLINK-3229-RemoveKclKplRework

The above link is the separate branch Ihave been working on for the related 
changes to the KCL/KPL dependency rework. I've finished with the replacement of 
KPL in the producer, please feel free to test it around / review before we 
decide if we should open a separate issue & PR for this or merge into the 
original PR.

As for the KCL in the consumer which we're only using the 'UserRecord' class 
for deaggregating records, I'm currently a bit stuck because the original 
implementation of the class in the KCL uses protobuf generated code, and I'm 
not sure whether or not it is suitable to simple make a copy of it in the 
connector. On the other hand, whether or not we should still deaggregate 
records might be discussable because record aggregation is essentially a 
built-in functionality of the KPL, and we aren't using KPL anymore.

Besides the KCL, some more tasks to do for this rework:
1. Tests for the new classes introduced with the rework on the producer
2. I'd like to unify how the consumer / producer is configured. Currently they 
are taking different approaches, which is kind of strange since they are in the 
same connector. I'll probably go for an approach more like the previous 
producer.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15276392#comment-15276392
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


On the other hand, if we're aiming for the Kinesis connector to be available in 
Flink 1.1.0, and that 1.1 won't be scheduled for release any time before June, 
then perhaps its also fine to stick with the current ticket / PR for further 
changes.
Either way, I'll have a first version of Java-SDK-only connector by tomorrow so 
that we can start testing around with it.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15276381#comment-15276381
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


[~rmetzger]
Hi Robert,
I'm halfway in progress with the rework of removing KCL/KPL dependency from the 
connector code, and should be ready to submit tomorrow. 

However, since the change will have a big difference from the producer code 
merged with https://issues.apache.org/jira/browse/FLINK-3230, I am wondering 
whether or not it is more suitable to open a separate JIRA issue / PR for this 
change, and to first merge the current state of the Kinesis connector as of 
https://github.com/apache/flink/pull/1911 (if you think it is stable for 
merging, of course) with your suggestion of optional module not built in 
releases and that people will have to build themselves.

Another main reason I propose to submit this change as a separate PR is because 
I won't have access to computers after 5/11 (2 days from now) until 6/1. The 
change to the current state of the connector will certainly need more testing 
and reviewing, and I won't be able to address any issues during my leave. This 
will certainly postpone the availability of the Kinesis connector in Flink, at 
least until June when I'm back.

Please let me know what you think :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275172#comment-15275172
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


If you want, I can start looking into it tomorrow. Have the next 3 days free to 
work on it :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-07 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275162#comment-15275162
 ] 

Robert Metzger commented on FLINK-3211:
---

That's a good idea.
I think we need to figure out how hard it would be to implement a producer 
without using the producer library. 

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15275136#comment-15275136
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


[~rmetzger]
Perhaps there is one more possible solution: use only the AWS Java SDK for the 
connector to avoid the protobuf version clash issue?
The consumer only uses 1 class (UserRecords) from the KCL which we can 
replicate to the code with notes of its source from AWS, and it shouldn't be 
too hard to replace the KPL code in the producer either.

Might not be the cleanest approach though, what do you think? If all not works, 
I'm fine with publishing it as a dataArtisans GitHub project until cleaner 
integration into the Flink codebase is possible :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-06 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15274119#comment-15274119
 ] 

Robert Metzger commented on FLINK-3211:
---

[~tzulitai] Would you be okay of publishing the kinesis connector as a github 
project? Maybe in the dataArtisans github, as "flink-connectors" or so?

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-06 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15274018#comment-15274018
 ] 

Robert Metzger commented on FLINK-3211:
---

I've spend quite some time with the Kinesis connector to get it working with 
Flink properly, but there is one big issue: The Kinesis producer library (and 
the client library) both depend on protobuf-java 2.6.1, Flink is using protobuf 
2.5 (mainly forced by Akka).

I tried the following approaches:
- Exclude the protobuf-java dependency from Akka --> Doesn't work because it 
seems to be hardwired somehow
- Upgrade to Akka 2.4.0, which doesn't depend on protobuf anymore --> doesn't 
work because Akka 2.4 depends on Java 8 (and Scala 2.11)
- Shade Kinesis connector's protobuf dependency into the 
"flink-connector-kinesis" jar --> It works (but we can not do it like this due 
to legal restrictions (the Amazon Software License restricts the use of the 
software to AWS services, that's not compatible with the ASF)).

I see the following solutions:
- We merge the kinesis code into master, as an optional module, which is not 
build for releases. People have to build it themselves
- We host the kinesis connector in a separate project (outside of the ASF)
- We ask Amazon to exclude the protobuf dependency.
- I try again using Akka without Protobuf.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-18 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245562#comment-15245562
 ] 

Robert Metzger commented on FLINK-3211:
---

According to LEGAL-198, we have to make the streaming connector optional.

I'll soon open a pull request for FLINK-3230 with the required infrastructure.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-18 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245368#comment-15245368
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Hi Robert,

That would be great! Looking forward to it, thanks.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243988#comment-15243988
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-15 Thread Marut Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243940#comment-15243940
 ] 

Marut Singh commented on FLINK-3211:


Is there any update on this issue?



> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206777#comment-15206777
 ] 

Robert Metzger commented on FLINK-3211:
---

Great, thank you. I'm looking forward reviewing it.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206770#comment-15206770
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


No problem :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206755#comment-15206755
 ] 

Robert Metzger commented on FLINK-3211:
---

Hi Tzu,
thank you for your response. I'm happy to see that you are concerned about 
having good tests for the code. We have those tests for all connectors which 
connect with an open source system, running on the JVM.

But for example our RabbitMQ connector also doesn't contain any tests 
connecting to a real RMQ. There are some tests using a mocked RMQ.
Its fine to follow the same approach with the Kinesis connector. Lets just 
manually test the code on AWS.

Please open a pull request as soon as you have it ready so that we can start 
reviewing it. It would be nice to put it into the Flink 1.1 release.



> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206739#comment-15206739
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Hi, thanks Roy.

I don't think using the actual Kinesis API will be feasible since the tests 
will be part of the build.
I did take a look at kinesalite at first and gave it a try, but was still quite 
tedious to integrate it into the code base for the sake of tests since it isn't 
Java/Scala.

I am still quite new and unfamiliar with PR to the Flink community, and was 
uncertain of sending out the PR before all tests were properly covered.
Nevertheless, if integration tests aren't too much of an issue for the 
connector and the connector is urgently needed, I can start polishing things up 
and prepare the PR!

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206709#comment-15206709
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Hi Roy & Robert

Apologies for the late reply.
I already have a working version that I have been using in non-production 
environments for a while now.
The main reason for not sending a PR yet is because of the lack of tests for 
the Kinesis producer. The main issue is with integration tests, which is hard 
since Kinesis is an AWS service and doesn't come with a local version.

If this isn't too much of an issue, I can surely PR soon!

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Roy Ben-Alta (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206699#comment-15206699
 ] 

Roy Ben-Alta commented on FLINK-3211:
-

Hi Robert,

I am the Business Development Manager for Amazon Kinesis and it will be great 
to have Apache Flink connector to Amazon Kinesis.Please send me email to 
benal...@amazon.com and we can chat over emails. Will be happy to provide you 
useful collateral in regards to Amazon Kinesis streams.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Roy Ben-Alta (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206716#comment-15206716
 ] 

Roy Ben-Alta commented on FLINK-3211:
-

Hi Tzu,

There is no Kinesis local exist but there is one I found on Git: 
https://github.com/mhart/kinesalite
However, you should not have issue to test it on AWS by using Kinesis API (You 
can create stream with x number of shards and delete once your test are 
completed).
Feel free to contact me at benaltar@amazon and I will be happy to provide you 
with some credits if needed for the sake of the test.

Roy.


> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206677#comment-15206677
 ] 

Robert Metzger commented on FLINK-3211:
---

Hi Roy,

I'll try to contact Tzu-Li via email to get an estimate when this is done. How 
urgently do you need the connector?
Maybe I can allocate some time soon to get a first version into Flink.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-21 Thread Roy Ben-Alta (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15204337#comment-15204337
 ] 

Roy Ben-Alta commented on FLINK-3211:
-

Hi,

Is there any ETA for releasing the connector?


> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15101866#comment-15101866
 ] 

Stephan Ewen commented on FLINK-3211:
-

Some comments on this:

  - Concerning start point for reading: Make sense to define "earliest" 
(TRIM_HORIZON) and "latest"

  - Flink does not rely that records come in timstamp order from the sources - 
watermarks can compensate for that. The best and easiest thing if records come 
in timestamp order per partition/shard (no requirement across partition). Flink 
itself tracks progress of timestamps in operators that consume multiple streams 
(like union, or when a shuffle/rebalance happens).

  - When re-reading historic data from the durable Kinesis streams, I would 
expect that all per-shard offsets are defined anyways as part of the savepoint 
from which the program starts, or the start position is simply the earliest.

  - Limiting a Kinesis consumer to one stream (multiple shards) is fine. One 
can always instantiate multiple source tasks for multiple streams and union 
them.

  - What is currently important for the exactly-once guarantees is that the 
assignment of shard to Flink-source-subtask is fix across restarts. We can 
think of ways to relax that later.


> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095796#comment-15095796
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Very happy to help out and contribute :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095795#comment-15095795
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-12 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095775#comment-15095775
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Some issues on the originally planned implementation:

1. It is not possible to allow the user to configure start reading streams from 
a specific record sequence number.
The term "sequence number" used by Kinesis is actually quite misleading. I 
found out that the sequence number of each record is encoded to a specific 
shard, therefore only meaningful within a shard (like offsets in Kafka 
partitions). Therefore, this feature is only viable if the user specifies the 
record sequence number they want to start reading for each single shard. I may 
be wrong, but I doubt the usability of this feature.

2. Data ordering will be severely skewed if we start reading a stream in 
parallel from the earliest record sequence number. Affect on Flink's Event Time 
functionality?
Likewise, since the sequence number of records is completely independent across 
partitions in the stream, there is no "stream-global" earliest record sequence 
number. Therefore, when the user opts to "read from stream starting from 
earliest sequence number record", what we will be doing is actually start 
parallel read from each shard's earliest record possible.
Obviously, the ordering of consumed data will most likely be very out of order 
with respect to the order they were produced to the stream (this is actually an 
inevitable situation for distributed message queues like Kafka / Kinesis). This 
reading mode is called TRIM_HORIZON in Kinesis, and the AWS KCL actually shows 
the same issue. They don't guarantee consumption ordering when you have more 
than one partition.
My problem is that I'm not sure how this will affect Flink's Event Time 
functionality. More than often, Flink users will not be aware of this obscure 
stream consumption behaviour which may have a big impact on the outputs of the 
dataflow which they may not realize.


On the other hand, opting to start stream reading from the latest record 
sequence number on each shard will not pose that much off an impact on the 2nd 
issue. Although there is still no strict guarantee on global consumption 
ordering across partitions, the ordering won't be that skewed compared to 
parallel reading starting from historical points since data records are 
consumed as soon as they were produced to the stream right from the beginning 
of the consumer tasks.

Conclusions:
Sorry about the chunky descriptions. I felt I needed to elaborate the 
considerations as detailed as possible :P
Firstly, I don't think it is required to implement opting to read from specific 
sequence number. Doesn't make sense unless user specifies sequence number for 
every shard. It's still do-able, so please tell me if you think this is 
required.
Secondly, opting to start reading from historical points (either from the 
earliest or specific sequence number) will most likely result in very skewed 
consumption ordering. I'm curious about the affects of this on Flink 
streaming's time-related functions. Or is this something we shouldn't be 
worrying about?

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-12 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093617#comment-15093617
 ] 

Fabian Hueske commented on FLINK-3211:
--

Thanks [~tzulitai] for your detailed issue description and research on this 
topic!
A Kinesis connector would be a very important addition to Flink.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091408#comment-15091408
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Thanks [~StephanEwen] for the response and help on JIRA permission assignment.
Noted on implementation to offer the user to decide where in the Kinesis stream 
to start reading ;)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. 
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


I'm currently in-progress of the work, will probably be able to open a pull 
request within a week.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-10 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091159#comment-15091159
 ] 

Stephan Ewen commented on FLINK-3211:
-

Awesome news. A Kinesis connector would be super nice indeed!

Looking forward to the pull request!

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-10 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091295#comment-15091295
 ] 

Stephan Ewen commented on FLINK-3211:
-

Concerning your questions: I think going with the low-level SDK seems 
reasonable.

In Kafka/Flink, Committing offsets back is mostly for outside tool to see where 
in the stream the consumer is, plus that a new program does not always start 
from the beginning. It is probably fine to not worry about that for now.

It would be good if the implementation offered to let the user decide where in 
the stream to start reading:
  - latest record sequence number (default)
  - earliest record sequence number
  - a specific sequence number.



> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090928#comment-15090928
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
>