[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Fix Version/s: (was: 1.0.0)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> After a first look at the AWS KCL (Kinesis Client Library), KCL already 
> supports stream read beginning from a specific offset (or "record sequence 
> number" in Kinesis terminology). For external checkpointing, KCL is designed  
> to use AWS DynamoDB to checkpoint application state, where each partition's 
> progress (or "shard" in Kinesis terminology) corresponds to a single row in 
> the KCL-managed DynamoDB table.
> So, implementing the AWS Kinesis connector will very much resemble the work 
> done on the Kafka connector, with a few different tweaks as following (I'm 
> mainly just rewording [~StephanEwen]'s original description [1]):
> 1. Determine KCL Shard Worker to Flink source task mapping. KCL already 
> offers worker tasks per shard, so we will need to do mapping much like [2].
> 2. Let the Flink connector also maintain a local copy of application state, 
> accessed using KCL API, for the distributed snapshot checkpointing.
> 3. Restart the KCL at the last Flink local checkpointed record sequence upon 
> failure. However, when KCL restarts after failure, it is originally designed 
> to reference the external DynamoDB table. Need a further look on how to work 
> with this so that the Flink checkpoint and external checkpoint in DynamoDB is 
> properly synced.
> Most of the details regarding KCL's state checkpointing, sharding, shard 
> workers, and failure recovery can be found here [3].
> As for the Kinesis sink connector, it should be fairly straightforward and 
> almost, if not completely, identical to the Kafka sink. The Kinesis sink can 
> be implemented with AWS KPL (Kinesis Producer Library) [4].
> On the other hand, while KCL and KPL are handy high-level APIs for AWS 
> Kinesis, it might be preferable for the user to use the low-level API in AWS 
> SDK instead, mainly due to the fact that the utilities of KCL and KPL come at 
> cost, such as extra cost for the DynamoDB table for KCL checkpoint and the 
> extra latency introduced by KPL [4]. For now, we can implement the initial 
> version of the Kinesis connector with KCL and KPL, and leave the work for 
> user-configurable switch for the connector to use low-level AWS SDK as future 
> to-dos.
> References:
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [2] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [3] http://docs.aws.amazon.com/kinesis/latest/dev/advanced-consumers.html
> [4] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). Kinesis streams can exist in 
different AWS regions, and each Kinesis stream under the same AWS user account 
may have completely independent access settings with different authorization 
keys. Overall, a Kinesis stream feels like a much more consolidated resource 
compared to Kafka topics. It would be great to hear more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). AWS can manage Kinesis across 
different regions, and each Kinesis stream under the same AWS user account may 
have completely independent access settings with different authorization keys. 
Overall, a Kinesis stream feels like a much more consolidated resource compared 
to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). Kinesis streams can exist in 
different AWS regions, and each Kinesis stream under the same AWS user account 
may have completely independent access settings with different authorization 
keys. Overall, a Kinesis stream feels like a much more consolidated resource 
compared to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Affects Version/s: 1.0.0

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> After a first look at the AWS KCL (Kinesis Client Library), KCL already 
> supports stream read beginning from a specific offset (or "record sequence 
> number" in Kinesis terminology). For external checkpointing, KCL is designed  
> to use AWS DynamoDB to checkpoint application state, where each partition's 
> progress (or "shard" in Kinesis terminology) corresponds to a single row in 
> the KCL-managed DynamoDB table.
> So, implementing the AWS Kinesis connector will very much resemble the work 
> done on the Kafka connector, with a few different tweaks as following (I'm 
> mainly just rewording [~StephanEwen]'s original description [1]):
> 1. Determine KCL Shard Worker to Flink source task mapping. KCL already 
> offers worker tasks per shard, so we will need to do mapping much like [2].
> 2. Let the Flink connector also maintain a local copy of application state, 
> accessed using KCL API, for the distributed snapshot checkpointing.
> 3. Restart the KCL at the last Flink local checkpointed record sequence upon 
> failure. However, when KCL restarts after failure, it is originally designed 
> to reference the external DynamoDB table. Need a further look on how to work 
> with this so that the Flink checkpoint and external checkpoint in DynamoDB is 
> properly synced.
> Most of the details regarding KCL's state checkpointing, sharding, shard 
> workers, and failure recovery can be found here [3].
> As for the Kinesis sink connector, it should be fairly straightforward and 
> almost, if not completely, identical to the Kafka sink. The Kinesis sink can 
> be implemented with AWS KPL (Kinesis Producer Library) [4].
> On the other hand, while KCL and KPL are handy high-level APIs for AWS 
> Kinesis, it might be preferable for the user to use the low-level API in AWS 
> SDK instead, mainly due to the fact that the utilities of KCL and KPL come at 
> cost, such as extra cost for the DynamoDB table for KCL checkpoint and the 
> extra latency introduced by KPL [4]. For now, we can implement the initial 
> version of the Kinesis connector with KCL and KPL, and leave the work for 
> user-configurable switch for the connector to use low-level AWS SDK as future 
> to-dos.
> References:
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [2] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [3] http://docs.aws.amazon.com/kinesis/latest/dev/advanced-consumers.html
> [4] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a more 
higher-level of abstraction that also comes with checkpointing and failure 
recovery by using a KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). AWS can manage Kinesis across 
different regions, and each Kinesis stream under the same AWS user account may 
have completely independent access settings with different authorization keys. 
Overall, a Kinesis stream feels like a much more consolidated resource compared 
to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

After a first look at the AWS KCL (Kinesis Client Library), KCL already 
supports stream read beginning from a specific offset (or "record sequence 
number" in Kinesis terminology). For external checkpointing, KCL is designed  
to use AWS DynamoDB to checkpoint application state, where each partition's 
progress (or "shard" in Kinesis terminology) corresponds to a single row in the 
KCL-managed DynamoDB table.

So, implementing the AWS Kinesis connector will very much resemble the work 
done on the Kafka connector, with a few different tweaks as following (I'm 
mainly just rewording [~StephanEwen]'s original description [1]):

1. Determine KCL Shard Worker to Flink source 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). AWS can manage Kinesis across 
different regions, and each Kinesis stream under the same AWS user account may 
have completely independent access settings with different authorization keys. 
Overall, a Kinesis stream feels like a much more consolidated resource compared 
to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a more 
higher-level of abstraction that also comes with checkpointing and failure 
recovery by using a KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). Kinesis streams can exist in 
different AWS regions, and each Kinesis stream under the same AWS user account 
may have completely independent access settings with different authorization 
keys. Overall, a Kinesis stream feels like a much more consolidated resource 
compared to Kafka topics. It would be great to hear more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
implementation. The implementation should be straight forward, being almost if 
not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090928#comment-15090928
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090928#comment-15090928
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 6:40 AM:
-

A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region, also that they all have identical access settings 
(i.e., can all be accessed with the same single AWS credential).


was (Author: tzulitai):
A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning 

[jira] [Created] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-08 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-3211:
--

 Summary: Add AWS Kinesis streaming connector
 Key: FLINK-3211
 URL: https://issues.apache.org/jira/browse/FLINK-3211
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai
 Fix For: 1.0.0


AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

After a first look at the AWS KCL (Kinesis Client Library), KCL already 
supports stream read beginning from a specific offset (or "record sequence 
number" in Kinesis terminology). For external checkpointing, KCL is designed  
to use AWS DynamoDB to checkpoint application state, where each partition's 
progress (or "shard" in Kinesis terminology) corresponds to a single row in the 
KCL-managed DynamoDB table.

So, implementing the AWS Kinesis connector will very much resemble the work 
done on the Kafka connector, with a few different tweaks as following (I'm 
mainly just rewording [~StephanEwen]'s original description [1]):

1. Determine KCL Shard Worker to Flink source task mapping. KCL already offers 
worker tasks per shard, so we will need to do mapping much like [2].

2. Let the Flink connector also maintain a local copy of application state, 
accessed using KCL API, for the distributed snapshot checkpointing.

3. Restart the KCL at the last Flink local checkpointed record sequence upon 
failure. However, when KCL restarts after failure, it is originally designed to 
reference the external DynamoDB table. Need a further look on how to work with 
this so that the Flink checkpoint and external checkpoint in DynamoDB is 
properly synced.

Most of the details regarding KCL's state checkpointing, sharding, shard 
workers, and failure recovery can be found here [3].

As for the Kinesis sink connector, it should be fairly straightforward and 
almost, if not completely, identical to the Kafka sink.

References:
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[2] http://data-artisans.com/kafka-flink-a-practical-how-to/
[3] http://docs.aws.amazon.com/kinesis/latest/dev/advanced-consumers.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

After a first look at the AWS KCL (Kinesis Client Library), KCL already 
supports stream read beginning from a specific offset (or "record sequence 
number" in Kinesis terminology). For external checkpointing, KCL is designed  
to use AWS DynamoDB to checkpoint application state, where each partition's 
progress (or "shard" in Kinesis terminology) corresponds to a single row in the 
KCL-managed DynamoDB table.

So, implementing the AWS Kinesis connector will very much resemble the work 
done on the Kafka connector, with a few different tweaks as following (I'm 
mainly just rewording [~StephanEwen]'s original description [1]):

1. Determine KCL Shard Worker to Flink source task mapping. KCL already offers 
worker tasks per shard, so we will need to do mapping much like [2].

2. Let the Flink connector also maintain a local copy of application state, 
accessed using KCL API, for the distributed snapshot checkpointing.

3. Restart the KCL at the last Flink local checkpointed record sequence upon 
failure. However, when KCL restarts after failure, it is originally designed to 
reference the external DynamoDB table. Need a further look on how to work with 
this so that the Flink checkpoint and external checkpoint in DynamoDB is 
properly synced.

Most of the details regarding KCL's state checkpointing, sharding, shard 
workers, and failure recovery can be found here [3].

As for the Kinesis sink connector, it should be fairly straightforward and 
almost, if not completely, identical to the Kafka sink. The Kinesis sink can be 
implemented with AWS KPL (Kinesis Producer Library) [4].

On the other hand, while KCL and KPL are handy high-level APIs for AWS Kinesis, 
it might be preferable for the user to use the low-level API in AWS SDK 
instead, mainly due to the fact that the utilities of KCL and KPL come at cost, 
such as extra cost for the DynamoDB table for KCL checkpoint and the extra 
latency introduced by KPL [4]. For now, we can implement the initial version of 
the Kinesis connector with KCL and KPL, and leave the work for 
user-configurable switch for the connector to use low-level AWS SDK as future 
to-dos.

References:
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[2] http://data-artisans.com/kafka-flink-a-practical-how-to/
[3] http://docs.aws.amazon.com/kinesis/latest/dev/advanced-consumers.html
[4] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

After a first look at the AWS KCL (Kinesis Client Library), KCL already 
supports stream read beginning from a specific offset (or "record sequence 
number" in Kinesis terminology). For external checkpointing, KCL is designed  
to use AWS DynamoDB to checkpoint application state, where each partition's 
progress (or "shard" in Kinesis terminology) corresponds to a single row in the 
KCL-managed DynamoDB table.

So, implementing the AWS Kinesis connector will very much resemble the work 
done on the Kafka connector, with a few different tweaks as following (I'm 
mainly just rewording [~StephanEwen]'s original description [1]):

1. Determine KCL Shard Worker to Flink source task mapping. KCL already offers 
worker tasks per shard, so we will need to do mapping much like [2].

2. Let the Flink connector also maintain a local copy of application state, 
accessed using KCL API, for the distributed snapshot checkpointing.

3. Restart the KCL at the last Flink local checkpointed record sequence upon 
failure. However, when KCL restarts after failure, it is originally designed to 
reference the external DynamoDB table. Need a further look on how to work with 
this so that the Flink checkpoint and external checkpoint in DynamoDB is 
properly synced.

Most of the details regarding KCL's state checkpointing, sharding, shard 
workers, and failure recovery can be found here [3].

As for the Kinesis sink connector, it should be fairly straightforward and 
almost, if not completely, identical to the Kafka sink.

References:
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[2] http://data-artisans.com/kafka-flink-a-practical-how-to/
[3] 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-12 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095775#comment-15095775
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Some issues on the originally planned implementation:

1. It is not possible to allow the user to configure start reading streams from 
a specific record sequence number.
The term "sequence number" used by Kinesis is actually quite misleading. I 
found out that the sequence number of each record is encoded to a specific 
shard, therefore only meaningful within a shard (like offsets in Kafka 
partitions). Therefore, this feature is only viable if the user specifies the 
record sequence number they want to start reading for each single shard. I may 
be wrong, but I doubt the usability of this feature.

2. Data ordering will be severely skewed if we start reading a stream in 
parallel from the earliest record sequence number. Affect on Flink's Event Time 
functionality?
Likewise, since the sequence number of records is completely independent across 
partitions in the stream, there is no "stream-global" earliest record sequence 
number. Therefore, when the user opts to "read from stream starting from 
earliest sequence number record", what we will be doing is actually start 
parallel read from each shard's earliest record possible.
Obviously, the ordering of consumed data will most likely be very out of order 
with respect to the order they were produced to the stream (this is actually an 
inevitable situation for distributed message queues like Kafka / Kinesis). This 
reading mode is called TRIM_HORIZON in Kinesis, and the AWS KCL actually shows 
the same issue. They don't guarantee consumption ordering when you have more 
than one partition.
My problem is that I'm not sure how this will affect Flink's Event Time 
functionality. More than often, Flink users will not be aware of this obscure 
stream consumption behaviour which may have a big impact on the outputs of the 
dataflow which they may not realize.


On the other hand, opting to start stream reading from the latest record 
sequence number on each shard will not pose that much off an impact on the 2nd 
issue. Although there is still no strict guarantee on global consumption 
ordering across partitions, the ordering won't be that skewed compared to 
parallel reading starting from historical points since data records are 
consumed as soon as they were produced to the stream right from the beginning 
of the consumer tasks.

Conclusions:
Sorry about the chunky descriptions. I felt I needed to elaborate the 
considerations as detailed as possible :P
Firstly, I don't think it is required to implement opting to read from specific 
sequence number. Doesn't make sense unless user specifies sequence number for 
every shard. It's still do-able, so please tell me if you think this is 
required.
Secondly, opting to start reading from historical points (either from the 
earliest or specific sequence number) will most likely result in very skewed 
consumption ordering. I'm curious about the affects of this on Flink 
streaming's time-related functions. Or is this something we shouldn't be 
worrying about?

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once 

[jira] [Created] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-3231:
--

 Summary: Handle Kinesis-side resharding in Kinesis streaming 
consumer
 Key: FLINK-3231
 URL: https://issues.apache.org/jira/browse/FLINK-3231
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Affects Versions: 1.0.0
Reporter: Tzu-Li (Gordon) Tai


A big difference between Kinesis shards and Kafka partitions is that Kinesis 
users can choose to "merge" and "split" shards at any time for adjustable 
stream throughput capacity. This article explains this quite clearly: 
https://brandur.org/kinesis-by-example.

This will break the static shard-to-task mapping implemented in the basic 
version of the Kinesis consumer 
(https://issues.apache.org/jira/browse/FLINK-3211). The static shard-to-task 
mapping is done in a simple round-robin-like distribution which can be locally 
determined at each Flink consumer task (Flink Kafka consumer does this too).

To handle Kinesis resharding, we will need some way to let the Flink consumer 
tasks coordinate which shards they are currently handling, and allow the tasks 
to ask the coordinator for a shards reassignment when the task finds out it has 
found a closed shard at runtime (shards will be closed by Kinesis when it is 
merged and split).

A possible approach to this is a centralized coordinator state store which is 
visible to all Flink consumer tasks. Tasks can use this state store to locally 
determine what shards it can be reassigned. Zookeeper can be used for this 
state store, but that means it would require the user to set up ZK to work.

Since this feature introduces extensive work, it is opened as a separate 
sub-task from the basic implementation 
https://issues.apache.org/jira/browse/FLINK-3211.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095796#comment-15095796
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Very happy to help out and contribute :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095795#comment-15095795
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3229) Kinesis consumer with integration of Flink's checkpointing mechanics

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-3229:
--

 Summary: Kinesis consumer with integration of Flink's 
checkpointing mechanics
 Key: FLINK-3229
 URL: https://issues.apache.org/jira/browse/FLINK-3229
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Affects Versions: 1.0.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


Opening a sub-task to implement data source consumer for Kinesis streaming 
connector (https://issues.apache.org/jira/browser/FLINK-3211).

An example of the planned user API for Flink Kinesis Consumer:

{code}
Properties config = new Properties();
config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3");
config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, "1000");
config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest");
config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1");

AWSCredentialsProvider credentials = // credentials API in AWS SDK

DataStream kinesisRecords = env
.addSource(new FlinkKinesisConsumer<>(
listOfStreams, credentials, new SimpleStringSchema(), config
));
{code}

Currently still considering which read start positions to support 
("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this can 
be found in https://issues.apache.org/jira/browser/FLINK-3211.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3229:
---
Summary: Kinesis streaming consumer with integration of Flink's 
checkpointing mechanics  (was: Kinesis consumer with integration of Flink's 
checkpointing mechanics)

> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties config = new Properties();
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, 
> "1000");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest");
> config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1");
> AWSCredentialsProvider credentials = // credentials API in AWS SDK
> DataStream kinesisRecords = env
> .addSource(new FlinkKinesisConsumer<>(
> listOfStreams, credentials, new SimpleStringSchema(), config
> ));
> {code}
> Currently still considering which read start positions to support 
> ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this 
> can be found in https://issues.apache.org/jira/browser/FLINK-3211.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3230) Kinesis streaming producer

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-3230:
--

 Summary: Kinesis streaming producer
 Key: FLINK-3230
 URL: https://issues.apache.org/jira/browse/FLINK-3230
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai


Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
be using AWS SDK implementation for code consistency with the 
FlinkKinesisConsumer.

The features of FlinkKinesisProducer is rather straightforward:
1. Partition put records based on partition key.
2. Configurable put mode: Bulk put for higher throughput vs. sequential single 
record puts. Size of bulk should also be configurable.
3. For bulk put, user can also choose to enforce strict ordering of the result 
with the tradeoff of higher put latency. Ref: https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095796#comment-15095796
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/13/16 8:11 AM:
-

Very happy to help out and contribute!


was (Author: tzulitai):
Very happy to help out and contribute :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095795#comment-15095795
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/13/16 9:50 AM:
-

Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229
I'll be posting any future consumer-specific comments in the sub-task.


was (Author: tzulitai):
Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229
I'll post any future consumer-specific comments in the sub-task.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095795#comment-15095795
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/13/16 9:49 AM:
-

Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229
I'll post any future consumer-specific comments in the sub-task.


was (Author: tzulitai):
Opening separate sub-tasks for source consumer / sink producer.
The consumer sub-task has an example of the expected FlinkKinesisConsumer API: 
https://issues.apache.org/jira/browse/FLINK-3229

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> 

[jira] [Updated] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3231:
---
Description: 
A big difference between Kinesis shards and Kafka partitions is that Kinesis 
users can choose to "merge" and "split" shards at any time for adjustable 
stream throughput capacity. This article explains this quite clearly: 
https://brandur.org/kinesis-by-example.

This will break the static shard-to-task mapping implemented in the basic 
version of the Kinesis consumer 
(https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
mapping is done in a simple round-robin-like distribution which can be locally 
determined at each Flink consumer task (Flink Kafka consumer does this too).

To handle Kinesis resharding, we will need some way to let the Flink consumer 
tasks coordinate which shards they are currently handling, and allow the tasks 
to ask the coordinator for a shards reassignment when the task finds out it has 
found a closed shard at runtime (shards will be closed by Kinesis when it is 
merged and split).

We need a centralized coordinator state store which is visible to all Flink 
consumer tasks. Tasks can use this state store to locally determine what shards 
it can be reassigned. Amazon KCL uses a DynamoDB table for the coordination, 
but as described in https://issues.apache.org/jira/browse/FLINK-3229, we 
unfortunately can't use KCL for the implementation of the consumer if we want 
to leverage Flink's checkpointing mechanics. For our own implementation, 
Zookeeper can be used for this state store, but that means it would require the 
user to set up ZK to work.

Since this feature introduces extensive work, it is opened as a separate 
sub-task from the basic implementation 
https://issues.apache.org/jira/browse/FLINK-3229.

  was:
A big difference between Kinesis shards and Kafka partitions is that Kinesis 
users can choose to "merge" and "split" shards at any time for adjustable 
stream throughput capacity. This article explains this quite clearly: 
https://brandur.org/kinesis-by-example.

This will break the static shard-to-task mapping implemented in the basic 
version of the Kinesis consumer 
(https://issues.apache.org/jira/browse/FLINK-3211). The static shard-to-task 
mapping is done in a simple round-robin-like distribution which can be locally 
determined at each Flink consumer task (Flink Kafka consumer does this too).

To handle Kinesis resharding, we will need some way to let the Flink consumer 
tasks coordinate which shards they are currently handling, and allow the tasks 
to ask the coordinator for a shards reassignment when the task finds out it has 
found a closed shard at runtime (shards will be closed by Kinesis when it is 
merged and split).

A possible approach to this is a centralized coordinator state store which is 
visible to all Flink consumer tasks. Tasks can use this state store to locally 
determine what shards it can be reassigned. Zookeeper can be used for this 
state store, but that means it would require the user to set up ZK to work.

Since this feature introduces extensive work, it is opened as a separate 
sub-task from the basic implementation 
https://issues.apache.org/jira/browse/FLINK-3211.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> 

[jira] [Updated] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3231:
---
Description: 
A big difference between Kinesis shards and Kafka partitions is that Kinesis 
users can choose to "merge" and "split" shards at any time for adjustable 
stream throughput capacity. This article explains this quite clearly: 
https://brandur.org/kinesis-by-example.

This will break the static shard-to-task mapping implemented in the basic 
version of the Kinesis consumer 
(https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
mapping is done in a simple round-robin-like distribution which can be locally 
determined at each Flink consumer task (Flink Kafka consumer does this too).

To handle Kinesis resharding, we will need some way to let the Flink consumer 
tasks coordinate which shards they are currently handling, and allow the tasks 
to ask the coordinator for a shards reassignment when the task finds out it has 
found a closed shard at runtime (shards will be closed by Kinesis when it is 
merged and split).

We need a centralized coordinator state store which is visible to all Flink 
consumer tasks. Tasks can use this state store to locally determine what shards 
it can be reassigned. Amazon KCL uses a DynamoDB table for the coordination, 
but as described in https://issues.apache.org/jira/browse/FLINK-3211, we 
unfortunately can't use KCL for the implementation of the consumer if we want 
to leverage Flink's checkpointing mechanics. For our own implementation, 
Zookeeper can be used for this state store, but that means it would require the 
user to set up ZK to work.

Since this feature introduces extensive work, it is opened as a separate 
sub-task from the basic implementation 
https://issues.apache.org/jira/browse/FLINK-3229.

  was:
A big difference between Kinesis shards and Kafka partitions is that Kinesis 
users can choose to "merge" and "split" shards at any time for adjustable 
stream throughput capacity. This article explains this quite clearly: 
https://brandur.org/kinesis-by-example.

This will break the static shard-to-task mapping implemented in the basic 
version of the Kinesis consumer 
(https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
mapping is done in a simple round-robin-like distribution which can be locally 
determined at each Flink consumer task (Flink Kafka consumer does this too).

To handle Kinesis resharding, we will need some way to let the Flink consumer 
tasks coordinate which shards they are currently handling, and allow the tasks 
to ask the coordinator for a shards reassignment when the task finds out it has 
found a closed shard at runtime (shards will be closed by Kinesis when it is 
merged and split).

We need a centralized coordinator state store which is visible to all Flink 
consumer tasks. Tasks can use this state store to locally determine what shards 
it can be reassigned. Amazon KCL uses a DynamoDB table for the coordination, 
but as described in https://issues.apache.org/jira/browse/FLINK-3229, we 
unfortunately can't use KCL for the implementation of the consumer if we want 
to leverage Flink's checkpointing mechanics. For our own implementation, 
Zookeeper can be used for this state store, but that means it would require the 
user to set up ZK to work.

Since this feature introduces extensive work, it is opened as a separate 
sub-task from the basic implementation 
https://issues.apache.org/jira/browse/FLINK-3229.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091408#comment-15091408
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Thanks [~StephanEwen] for the response and help on JIRA permission assignment.
Noted on implementation to offer the user to decide where in the Kinesis stream 
to start reading ;)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-3211:
--

Assignee: Tzu-Li (Gordon) Tai

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4022:
---
Description: 
Example: allow users to subscribe to "topic-n*", so that the consumer 
automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
added to Kafka.

I propose to implement this feature by the following description:

Since the overall list of partitions to read will change after job submission, 
the main big change required for this feature will be dynamic partition 
assignment to subtasks while the Kafka consumer is running. This will mainly be 
accomplished using Kafka 0.9.x API 
`KafkaConsumer#subscribe(java.util.regex.Pattern, ConsumerRebalanceListener)`. 
Each KafkaConsumers in each subtask will be added to the same consumer group 
when instantiated, and rely on Kafka to dynamically reassign partitions to them 
whenever a rebalance happens. The registered `ConsumerRebalanceListener` is a 
callback that is called right before and after rebalancing happens. We'll use 
this callback to let each subtask commit its last offsets of partitions its 
currently responsible of to an external store (or Kafka) before a rebalance; 
after rebalance and the substasks gets the new partitions it'll be reading 
from, they'll read from the external store to get the last offsets for their 
new partitions (partitions which don't have offset entries in the store are new 
partitions causing the rebalancing).

The tricky part will be restoring Flink checkpoints when the partition 
assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
the offsets of partitions they are currently holding. Restoring will be  a bit 
different in that subtasks might not be assigned matching partitions to the 
snapshot the subtask is restored with (since we're letting Kafka dynamically 
assign partitions). There will need to be a coordination process where, if a 
restore state exists, all subtasks first commit the offsets they receive (as a 
result of the restore state) to the external store, and then all subtasks 
attempt to find a last offset for the partitions it is holding.

However, if the globally merged restore state feature mentioned by 
[~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
available, then the restore will be simple again, as each subtask has full 
access to previous global state therefore coordination is not required.

I think changing to dynamic partition assignment is also good in the long run 
for handling topic repartitioning.

Overall,

User-facing API changes:

- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
DeserializationSchema, Properties)
- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
KeyedDeserializationSchema, Properties)


Implementation changes:

1. Dynamic partition assigning depending on KafkaConsumer#subscribe
- Remove partition list querying from constructor
- Remove static partition assigning to substasks in run()
- Instead of using KafkaConsumer#assign() in fetchers to manually assign static 
partitions, use subscribe() registered with the callback implementation 
explained above.

2. Restoring from checkpointed states
- Snapshotting should remain unchanged
- Restoring requires subtasks to coordinate the restored offsets they hold 
before continuing (unless we are able to have merged restore states).

3. For previous consumer functionality (consume from fixed list of topics), the 
KafkaConsumer#subscribe() has a corresponding overload method for fixed list of 
topics. We can simply decide which subscribe() overload to use depending on 
whether a regex Pattern or list of topics is supplied.

4. If subtasks don't initially have any assigned partitions, we shouldn't emit 
MAX_VALUE watermark, since it may hold partitions after a rebalance. Instead, 
un-assigned subtasks should be running a fetcher instance too and take part as 
a process pool for the consumer group of the subscribed topics.

  was:
Allow users to subscribe to "topic-n*", so that the consumer automatically 
reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.

I propose to implement this feature by the following description:

Since the overall list of partitions to read will change after job submission, 
the main big change required for this feature will be dynamic partition 
assignment to subtasks while the Kafka consumer is running. This will mainly be 
accomplished using Kafka 0.9.x API 
`KafkaConsumer#subscribe(java.util.regex.Pattern, ConsumerRebalanceListener)`. 
Each KafkaConsumers in each subtask will be added to the same consumer group 
when instantiated, and rely on Kafka to dynamically reassign partitions to them 
whenever a rebalance happens. The registered `ConsumerRebalanceListener` is a 
callback that is called right before and after rebalancing happens. We'll use 
this callback to let 

[jira] [Created] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4022:
--

 Summary: Partition discovery / regex topic subscription for the 
Kafka consumer
 Key: FLINK-4022
 URL: https://issues.apache.org/jira/browse/FLINK-4022
 Project: Flink
  Issue Type: New Feature
Affects Versions: 1.0.0
Reporter: Tzu-Li (Gordon) Tai
 Fix For: 1.1.0


Allow users to subscribe to "topic-n*", so that the consumer automatically 
reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.

I propose to implement this feature by the following description:

Since the overall list of partitions to read will change after job submission, 
the main big change required for this feature will be dynamic partition 
assignment to subtasks while the Kafka consumer is running. This will mainly be 
accomplished using Kafka 0.9.x API 
`KafkaConsumer#subscribe(java.util.regex.Pattern, ConsumerRebalanceListener)`. 
Each KafkaConsumers in each subtask will be added to the same consumer group 
when instantiated, and rely on Kafka to dynamically reassign partitions to them 
whenever a rebalance happens. The registered `ConsumerRebalanceListener` is a 
callback that is called right before and after rebalancing happens. We'll use 
this callback to let each subtask commit its last offsets of partitions its 
currently responsible of to an external store (or Kafka) before a rebalance; 
after rebalance and the substasks gets the new partitions it'll be reading 
from, they'll read from the external store to get the last offsets for their 
new partitions (partitions which don't have offset entries in the store are new 
partitions causing the rebalancing).

The tricky part will be restoring Flink checkpoints when the partition 
assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
the offsets of partitions they are currently holding. Restoring will be  a bit 
different in that subtasks might not be assigned matching partitions to the 
snapshot the subtask is restored with (since we're letting Kafka dynamically 
assign partitions). There will need to be a coordination process where, if a 
restore state exists, all subtasks first commit the offsets they receive (as a 
result of the restore state) to the external store, and then all subtasks 
attempt to find a last offset for the partitions it is holding.

However, if the globally merged restore state feature mentioned by 
[~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
available, then the restore will be simple again, as each subtask has full 
access to previous global state therefore coordination is not required.

I think changing to dynamic partition assignment is also good in the long run 
for handling topic repartitioning.

Overall,

User-facing API changes:

- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
DeserializationSchema, Properties)
- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
KeyedDeserializationSchema, Properties)


Implementation changes:

1. Dynamic partition assigning depending on KafkaConsumer#subscribe
- Remove partition list querying from constructor
- Remove static partition assigning to substasks in run()
- Instead of using KafkaConsumer#assign() in fetchers to manually assign static 
partitions, use subscribe() registered with the callback implementation 
explained above.

2. Restoring from checkpointed states
- Snapshotting should remain unchanged
- Restoring requires subtasks to coordinate the restored offsets they hold 
before continuing (unless we are able to have merged restore states).

3. For previous consumer functionality (consume from fixed list of topics), the 
KafkaConsumer#subscribe() has a corresponding overload method for fixed list of 
topics. We can simply decide which subscribe() overload to use depending on 
whether a regex Pattern or list of topics is supplied.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315787#comment-15315787
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


I'd like to start working on this feature if it's ok with [~rmetzger], after 
confirming that this implementation is acceptable.

It'll introduce more complexity for state restoring for now, but once merged 
restore states are available it'll be simple again.
On the other hand, if we want to stick to defined partition assignments, then 
the complexity will probably be in implementing coordination for discovering 
new partitions / rebalancing between subtasks ourselves.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Allow users to subscribe to "topic-n*", so that the consumer automatically 
> reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4020:
---
Description: Currently FlinkKinesisConsumer is querying for the whole list 
of shards in the constructor, forcing the client to be able to access Kinesis 
as well. This is also a drawback for handling Kinesis-side resharding, since 
we'd want all shard listing / shard-to-task assigning / shard end (result of 
resharding) handling logic to be capable of being independently done within 
task life cycle methods, with defined and definite results.  (was: Currently 
FlinkKinesisConsumer is querying for the whole list of shards in the 
constructor, forcing the client to be able to access Kinesis as well. This is 
also a drawback for handling Kinesis-side resharding, since we'd want all shard 
listing / shard-to-task assigning / shard end (result of resharding) handling 
logic to be capable of being independently done within task life cycle methods, 
with defined and definite results.

Main thing to overcome is coordination between parallel subtasks. All subtasks 
will need to retry (due to Amazon's operation rate limits) until all subtasks 
have succeeded. We could probably use either ZK or Amazon DynamoDB (user 
configurable) for coordinating subtask status.)

> Remove shard list querying from Kinesis consumer constructor
> 
>
> Key: FLINK-4020
> URL: https://issues.apache.org/jira/browse/FLINK-4020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently FlinkKinesisConsumer is querying for the whole list of shards in 
> the constructor, forcing the client to be able to access Kinesis as well. 
> This is also a drawback for handling Kinesis-side resharding, since we'd want 
> all shard listing / shard-to-task assigning / shard end (result of 
> resharding) handling logic to be capable of being independently done within 
> task life cycle methods, with defined and definite results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-4020:
--

Assignee: Tzu-Li (Gordon) Tai

> Remove shard list querying from Kinesis consumer constructor
> 
>
> Key: FLINK-4020
> URL: https://issues.apache.org/jira/browse/FLINK-4020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Currently FlinkKinesisConsumer is querying for the whole list of shards in 
> the constructor, forcing the client to be able to access Kinesis as well. 
> This is also a drawback for handling Kinesis-side resharding, since we'd want 
> all shard listing / shard-to-task assigning / shard end (result of 
> resharding) handling logic to be capable of being independently done within 
> task life cycle methods, with defined and definite results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4033) Missing Scala example snippets for the Kinesis Connector documentation

2016-06-08 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4033:
--

 Summary: Missing Scala example snippets for the Kinesis Connector 
documentation
 Key: FLINK-4033
 URL: https://issues.apache.org/jira/browse/FLINK-4033
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Kinesis Connector, Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai
Priority: Minor
 Fix For: 1.1.0


The documentation for the Kinesis connector is missing Scala version of the 
example snippets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-06-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15317860#comment-15317860
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3923:


I'm wondering whether or not it might be a better idea to use dedicated 
configuration classes.
i.e.,
FlinkKinesisConsumerConfiguration and FlinkKinesisProducerConfiguration.
Both takes the required AWS connection info (region, credential) as constructor 
args, then use cascading set methods for additional settings.

For example for the consumer,
.setInitialPosition()
.setDescribeStreamBackfireMillis()
.setWatermarkAssigner()
... (any other config we may add in the future)

The configuration classes will be responsible for setting the default values of 
this optional settings (behaviour of reading default values when not set is 
kind of floppy right now).

What do you think?

> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
> Fix For: 1.1.0
>
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322524#comment-15322524
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3231:


Update for this:
Had a talk with [~rmetzger] about the implementation for this. We should be 
able to avoid inter- subtask coordination and any external state store by 
letting all subtasks poll for new shards that each subtask should be in charge 
of consuming, based on hash of stream name + shard id of each shard.

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-3231:
--

Assignee: Tzu-Li (Gordon) Tai

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3923) Unify configuration conventions of the Kinesis producer to the same as the consumer

2016-06-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318272#comment-15318272
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3923:


The consumer isn't using the KCL at the moment, so default overriding wouldn't 
be a problem. As for KPL in the producer, I've heard from Robert that he's 
already talked with Amazon and came up with some conclusions. He's probably 
still a bit busy early this week, should update with the conclusions soon.

> Unify configuration conventions of the Kinesis producer to the same as the 
> consumer
> ---
>
> Key: FLINK-3923
> URL: https://issues.apache.org/jira/browse/FLINK-3923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>Assignee: Abdullah Ozturk
> Fix For: 1.1.0
>
>
> Currently, the Kinesis consumer and producer are configured differently.
> The producer expects a list of arguments for the access key, secret, region, 
> stream. The consumer is accepting properties (similar to the Kafka connector).
> The objective of this issue is to change the producer so that it is also 
> using a properties-based configuration (including an input validation step)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4008) Hello,I am not able to create jar of flink-streaming-connectors ...I am able to create jar of others like twitter,kafka,flume but I am not able to create jar of flink-s

2016-06-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15320019#comment-15320019
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4008:


Don't build under the kinesis connector only.
Either go to the root of the flink project, or under flink-streaming-connectors 
if you've already built the whole project already before.
Also, currently you'll need to include the `include-kinesis` profile to be able 
to build Kinesis, i.e. `mvn clean package -Pinclude-kinesis`.

> Hello,I am not able to create jar of flink-streaming-connectors ...I am able 
> to create jar of others like twitter,kafka,flume but I am not able to create 
> jar of flink-streaming connectors ?? How can I create this jar ??
> ---
>
> Key: FLINK-4008
> URL: https://issues.apache.org/jira/browse/FLINK-4008
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Akshay Shingote
>
> I filed an issue here https://github.com/apache/flink/pull/2058 ... I want to 
> know how can we create jar of Flink-Streaming-Connectors ??



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards

2016-06-03 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315290#comment-15315290
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4018:


We have a user who has tried out the connector and preparing for production 
next week with Flink & Kinesis. The only thing worrying is this issue. I'll PR 
within the next 24 hours for review.

> Configurable idle time between getRecords requests to Kinesis shards
> 
>
> Key: FLINK-4018
> URL: https://issues.apache.org/jira/browse/FLINK-4018
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer is calling getRecords() right after finishing 
> previous calls. This results in easily reaching Amazon's limitation of 5 GET 
> requests per shard per second. Although the code already has backoff & retry 
> mechanism, this will affect other applications consuming from the same 
> Kinesis stream.
> Along with this new configuration and the already existing 
> `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more 
> control on the desired throughput behaviour for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards

2016-06-03 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4018:
--

 Summary: Configurable idle time between getRecords requests to 
Kinesis shards
 Key: FLINK-4018
 URL: https://issues.apache.org/jira/browse/FLINK-4018
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Affects Versions: 1.1.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


Currently, the Kinesis consumer is calling getRecords() right after finishing 
previous calls. This results in easily reaching Amazon's limitation of 5 GET 
requests per shard per second. Although the code already has backoff & retry 
mechanism, this will affect other applications consuming from the same Kinesis 
stream.

Along with this new configuration and the already existing 
`KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more 
control on the desired throughput behaviour for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-04 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4019:
--

 Summary: Expose approximateArrivalTimestamp through the 
KinesisDeserializationSchema interface
 Key: FLINK-4019
 URL: https://issues.apache.org/jira/browse/FLINK-4019
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Affects Versions: 1.1.0
Reporter: Tzu-Li (Gordon) Tai


Amazon's Record class also gives information about the timestamp of when 
Kinesis successfully receives the record: 
http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().

This should be useful info for users and should be exposed through the 
deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor

2016-06-04 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4020:
--

 Summary: Remove shard list querying from Kinesis consumer 
constructor
 Key: FLINK-4020
 URL: https://issues.apache.org/jira/browse/FLINK-4020
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai


Currently FlinkKinesisConsumer is querying for the whole list of shards in the 
constructor, forcing the client to be able to access Kinesis as well. This is 
also a drawback for handling Kinesis-side resharding, since we'd want all shard 
listing / shard-to-task assigning / shard end (result of resharding) handling 
logic to be capable of being independently done within task life cycle methods, 
with defined and definite results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4080:
--

 Summary: Kinesis consumer not exactly-once if stopped in the 
middle of processing aggregated records
 Key: FLINK-4080
 URL: https://issues.apache.org/jira/browse/FLINK-4080
 Project: Flink
  Issue Type: Sub-task
  Components: Kinesis Connector, Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Priority: Critical
 Fix For: 1.1.0


I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current state of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4080:
---
Description: 
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.

  was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current state of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
> records (either non- or de-aggregated) instead of the basic `Record` class. 
> This gives access to whether or not the record was originally aggregated. If 
> we encounter a de-aggregated record, don't update state until we finished 
> processing the last record of the batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4080:
---
Description: 
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix:
1. Use the extended `UserRecord` class in KCL to represent all records (either 
non- or de-aggregated) instead of the basic `Record` class. This gives access 
to whether or not the record was originally aggregated.
2. The sequence number state we are checkpointing needs to be able to indicate 
that the last seen sequence number of a shard may be a de-aggregated shard, 
i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record of the 
5th record was last seen for shard 0. On restore, we start again from record 5 
for shard 0 and skip the first 7 sub-records; however, for shard 1 we start 
from record 3 since record 2 is non-aggregated and already fully processed.

  was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4023) Move Kafka consumer partition discovery from constructor to open()

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328831#comment-15328831
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4023:


https://issues.apache.org/jira/browse/FLINK-4069
The above is a duplicate issue for this JIRA opened after this one. I'm 
referencing the link here since the description in FLINK-4069 covers additional 
information on the problem, as well as link to a related discussion thread on 
mailing list.

> Move Kafka consumer partition discovery from constructor to open()
> --
>
> Key: FLINK-4023
> URL: https://issues.apache.org/jira/browse/FLINK-4023
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Michal Harish
>Priority: Minor
>  Labels: kafka, kafka-0.8
>
> Currently, Flink queries Kafka for partition information when creating the 
> Kafka consumer. This is done on the client when submitting the Flink job, 
> which requires the client to be able to fetch the partition data from Kafka 
> which may only be accessible from the cluster environment where the tasks 
> will be running. Moving the partition discovery to the open() method should 
> solve this problem.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4069:


Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 on close this as a duplicate issue :) I've 
referenced a link to this JIRA on FLINK-4023.

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4069 at 6/14/16 2:50 AM:
-

Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 and close this as a duplicate issue :) 
I've referenced a link to this JIRA on FLINK-4023.


was (Author: tzulitai):
Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 on close this as a duplicate issue :) I've 
referenced a link to this JIRA on FLINK-4023.

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15328840#comment-15328840
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4069 at 6/14/16 2:50 AM:
-

Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Would you be ok with tracking this issue on FLINK-4023 and close this as a 
duplicate issue? I've referenced a link to this JIRA on FLINK-4023.


was (Author: tzulitai):
Thanks for creating this JIRA Shannon. However, there's already a previously 
opened JIRA on this problem: https://issues.apache.org/jira/browse/FLINK-4023. 
Let's track this issue on FLINK-4023 and close this as a duplicate issue :) 
I've referenced a link to this JIRA on FLINK-4023.

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4022:
---
Description: 
Allow users to subscribe to "topic-n*", so that the consumer automatically 
reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.

I propose to implement this feature by the following description:

Since the overall list of partitions to read will change after job submission, 
the main big change required for this feature will be dynamic partition 
assignment to subtasks while the Kafka consumer is running. This will mainly be 
accomplished using Kafka 0.9.x API 
`KafkaConsumer#subscribe(java.util.regex.Pattern, ConsumerRebalanceListener)`. 
Each KafkaConsumers in each subtask will be added to the same consumer group 
when instantiated, and rely on Kafka to dynamically reassign partitions to them 
whenever a rebalance happens. The registered `ConsumerRebalanceListener` is a 
callback that is called right before and after rebalancing happens. We'll use 
this callback to let each subtask commit its last offsets of partitions its 
currently responsible of to an external store (or Kafka) before a rebalance; 
after rebalance and the substasks gets the new partitions it'll be reading 
from, they'll read from the external store to get the last offsets for their 
new partitions (partitions which don't have offset entries in the store are new 
partitions causing the rebalancing).

The tricky part will be restoring Flink checkpoints when the partition 
assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
the offsets of partitions they are currently holding. Restoring will be  a bit 
different in that subtasks might not be assigned matching partitions to the 
snapshot the subtask is restored with (since we're letting Kafka dynamically 
assign partitions). There will need to be a coordination process where, if a 
restore state exists, all subtasks first commit the offsets they receive (as a 
result of the restore state) to the external store, and then all subtasks 
attempt to find a last offset for the partitions it is holding.

However, if the globally merged restore state feature mentioned by 
[~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
available, then the restore will be simple again, as each subtask has full 
access to previous global state therefore coordination is not required.

I think changing to dynamic partition assignment is also good in the long run 
for handling topic repartitioning.

Overall,

User-facing API changes:

- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
DeserializationSchema, Properties)
- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
KeyedDeserializationSchema, Properties)


Implementation changes:

1. Dynamic partition assigning depending on KafkaConsumer#subscribe
- Remove partition list querying from constructor
- Remove static partition assigning to substasks in run()
- Instead of using KafkaConsumer#assign() in fetchers to manually assign static 
partitions, use subscribe() registered with the callback implementation 
explained above.

2. Restoring from checkpointed states
- Snapshotting should remain unchanged
- Restoring requires subtasks to coordinate the restored offsets they hold 
before continuing (unless we are able to have merged restore states).

3. For previous consumer functionality (consume from fixed list of topics), the 
KafkaConsumer#subscribe() has a corresponding overload method for fixed list of 
topics. We can simply decide which subscribe() overload to use depending on 
whether a regex Pattern or list of topics is supplied.

4. If subtasks don't initially have any assigned partitions, we shouldn't emit 
MAX_VALUE watermark, since it may hold partitions after a rebalance. Instead, 
un-assigned subtasks should be running a fetcher instance too and take part as 
a process pool for the consumer group of the subscribed topics.

  was:
Allow users to subscribe to "topic-n*", so that the consumer automatically 
reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.

I propose to implement this feature by the following description:

Since the overall list of partitions to read will change after job submission, 
the main big change required for this feature will be dynamic partition 
assignment to subtasks while the Kafka consumer is running. This will mainly be 
accomplished using Kafka 0.9.x API 
`KafkaConsumer#subscribe(java.util.regex.Pattern, ConsumerRebalanceListener)`. 
Each KafkaConsumers in each subtask will be added to the same consumer group 
when instantiated, and rely on Kafka to dynamically reassign partitions to them 
whenever a rebalance happens. The registered `ConsumerRebalanceListener` is a 
callback that is called right before and after rebalancing happens. We'll use 
this callback to let each subtask 

[jira] [Updated] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor

2016-06-04 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4020:
---
Description: 
Currently FlinkKinesisConsumer is querying for the whole list of shards in the 
constructor, forcing the client to be able to access Kinesis as well. This is 
also a drawback for handling Kinesis-side resharding, since we'd want all shard 
listing / shard-to-task assigning / shard end (result of resharding) handling 
logic to be capable of being independently done within task life cycle methods, 
with defined and definite results.

Main thing to overcome is coordination between parallel subtasks. All subtasks 
will need to retry (due to Amazon's operation rate limits) until all subtasks 
have succeeded. We could probably use either ZK or Amazon DynamoDB (user 
configurable) for coordinating subtask status.

  was:Currently FlinkKinesisConsumer is querying for the whole list of shards 
in the constructor, forcing the client to be able to access Kinesis as well. 
This is also a drawback for handling Kinesis-side resharding, since we'd want 
all shard listing / shard-to-task assigning / shard end (result of resharding) 
handling logic to be capable of being independently done within task life cycle 
methods, with defined and definite results.


> Remove shard list querying from Kinesis consumer constructor
> 
>
> Key: FLINK-4020
> URL: https://issues.apache.org/jira/browse/FLINK-4020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> Currently FlinkKinesisConsumer is querying for the whole list of shards in 
> the constructor, forcing the client to be able to access Kinesis as well. 
> This is also a drawback for handling Kinesis-side resharding, since we'd want 
> all shard listing / shard-to-task assigning / shard end (result of 
> resharding) handling logic to be capable of being independently done within 
> task life cycle methods, with defined and definite results.
> Main thing to overcome is coordination between parallel subtasks. All 
> subtasks will need to retry (due to Amazon's operation rate limits) until all 
> subtasks have succeeded. We could probably use either ZK or Amazon DynamoDB 
> (user configurable) for coordinating subtask status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-04 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3231:
---
Affects Version/s: (was: 1.0.0)
   1.1.0

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-04 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315494#comment-15315494
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3231:


Hi Stephan,

Finally reached the point where we could start working on this. I'm currently 
in the designing phase for this feature.

Merged state on restore sounds interesting (is there currently any doc / thread 
/ JIRA that's issuing this merged states feature?).

However, I don't think it will be able to fully solve the described problem for 
this JIRA, unless we are expecting the streaming job to fail and restore every 
time resharding happens. We still need coordination between the subtasks to 
gracefully handle the resharding. 

On the other hand, if the merged state feature also provides access (including 
incomplete checkpoints) from subtasks during job execution, then it might be 
possible to figure out an implementation.

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-04 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315494#comment-15315494
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3231 at 6/4/16 1:04 PM:


Hi [~StephanEwen],

Finally reached the point where we could start working on this. I'm currently 
in the designing phase for this feature.

Merged state on restore sounds interesting (is there currently any doc / thread 
/ JIRA that's issuing this merged states feature?).

However, I don't think it will be able to fully solve the described problem for 
this JIRA, unless we are expecting the streaming job to fail and restore every 
time resharding happens. We still need coordination between the subtasks to 
gracefully handle the resharding. 

On the other hand, if the merged state feature also provides access (including 
incomplete checkpoints) from subtasks during job execution, then it might be 
possible to figure out an implementation.


was (Author: tzulitai):
Hi Stephan,

Finally reached the point where we could start working on this. I'm currently 
in the designing phase for this feature.

Merged state on restore sounds interesting (is there currently any doc / thread 
/ JIRA that's issuing this merged states feature?).

However, I don't think it will be able to fully solve the described problem for 
this JIRA, unless we are expecting the streaming job to fail and restore every 
time resharding happens. We still need coordination between the subtasks to 
gracefully handle the resharding. 

On the other hand, if the merged state feature also provides access (including 
incomplete checkpoints) from subtasks during job execution, then it might be 
possible to figure out an implementation.

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15316225#comment-15316225
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4019:


Good suggestion! For now, the connector doesn't support users to give timestamp 
/ watermark assigners. Perhaps we should expand the scope of this JIRA to 
include this also?

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3989) Add guide to write a custom streaming source / sink in DataStream API documentation

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-3989:
--

 Summary: Add guide to write a custom streaming source / sink in 
DataStream API documentation
 Key: FLINK-3989
 URL: https://issues.apache.org/jira/browse/FLINK-3989
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 1.1.0
Reporter: Tzu-Li (Gordon) Tai


I think it's a good idea to add a basic outline in the documentation for users 
to compose their own streaming connector source / sinks, as its quite a 
frequently asked question in the mailing lists and might be helpful for Flink 
to get traction on new external systems faster.

The documentation should contain information about what classes to extend, what 
life cycle methods to implement, how to implement an exactly once source, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305760#comment-15305760
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


[~rmetzger]
Hi Robert,
I'm back and ready to continue enhancing the Kinesis connector. Thank you for 
the merge on PR #1911.
Just a check before working on the new issues, are we still going going for the 
approach of removing both KPL & KCL from the connector and use basic SDK only? 
I have WIP for this: 
https://github.com/tzulitai/flink/tree/FLINK-3229-RemoveKclKplRework (not yet 
rebased on the recent changes)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305762#comment-15305762
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3229:


Great working with the community :) Thanks for merging!

> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3989) Add guide to write a custom streaming connector in DataStream API documentation

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3989:
---
Summary: Add guide to write a custom streaming connector in DataStream API 
documentation  (was: Add guide to write a custom streaming source / sink in 
DataStream API documentation)

> Add guide to write a custom streaming connector in DataStream API 
> documentation
> ---
>
> Key: FLINK-3989
> URL: https://issues.apache.org/jira/browse/FLINK-3989
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> I think it's a good idea to add a basic outline in the documentation for 
> users to compose their own streaming connector source / sinks, as its quite a 
> frequently asked question in the mailing lists and might be helpful for Flink 
> to get traction on new external systems faster.
> The documentation should contain information about what classes to extend, 
> what life cycle methods to implement, how to implement an exactly once 
> source, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-28 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305768#comment-15305768
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


I just noticed the conversation on PR #2016. Let's wait until decide on whether 
to drop the dependency ;)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3989) Add guide to write a custom streaming connector in DataStream API documentation

2016-05-31 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3989:
---
Assignee: Tzu-Li (Gordon) Tai

> Add guide to write a custom streaming connector in DataStream API 
> documentation
> ---
>
> Key: FLINK-3989
> URL: https://issues.apache.org/jira/browse/FLINK-3989
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> I think it's a good idea to add a basic outline in the documentation for 
> users to compose their own streaming connector source / sinks, as its quite a 
> frequently asked question in the mailing lists and might be helpful for Flink 
> to get traction on new external systems faster.
> The documentation should contain information about what classes to extend, 
> what life cycle methods to implement, how to implement an exactly once 
> source, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3989) Add guide to write a custom streaming connector in DataStream API documentation

2016-05-31 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307359#comment-15307359
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3989:


I'll be getting a first draft out for the next 2-3 days. Feel free to comment 
here on any information you think should be on the guide.

> Add guide to write a custom streaming connector in DataStream API 
> documentation
> ---
>
> Key: FLINK-3989
> URL: https://issues.apache.org/jira/browse/FLINK-3989
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> I think it's a good idea to add a basic outline in the documentation for 
> users to compose their own streaming connector source / sinks, as its quite a 
> frequently asked question in the mailing lists and might be helpful for Flink 
> to get traction on new external systems faster.
> The documentation should contain information about what classes to extend, 
> what life cycle methods to implement, how to implement an exactly once 
> source, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3954) Installing Flink on EMR using EMR bootstrap action

2016-05-31 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307388#comment-15307388
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3954:


https://issues.apache.org/jira/browse/FLINK-1337 The functionality is actually 
being worked on at the moment with this JIRA.

Also, since this is a question, would you be okay with closing this issue? The 
community will happily answer questions you have on the Flink mailing lists.

> Installing Flink on EMR using EMR bootstrap action
> --
>
> Key: FLINK-3954
> URL: https://issues.apache.org/jira/browse/FLINK-3954
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.2
> Environment: AWS EMR 4.6
>Reporter: Akshay Shingote
>
> Hello, I want to know can Flink be installed on AWS EMR through EMR's 
> bootstrap action?? This will help getting Flink installed on EMR's master & 
> core nodes..so is there any way through which we can install Flink on AWS EMR 
> through bootstrap action ??



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4008) Hello,I am not able to create jar of flink-streaming-connectors ...I am able to create jar of others like twitter,kafka,flume but I am not able to create jar of flink-s

2016-06-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311890#comment-15311890
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4008:


Hi Akshnay,

flink-streaming-connectors wasn't intended to be a Maven jar, but as an 
aggregator project to hold various connectors.
You'd not be able to build jars from Maven aggregator projects. To do so would 
require non-trivial changes to the Maven build infrastructure for the streaming 
connectors component.

May I ask your use case and why you intend to do so? Also, questions on Flink 
are usually asked via the dev / user mailing lists, and not on JIRA / PRs.
Are you okay with closing the PR and JIRA?

Thanks,
Gordon


> Hello,I am not able to create jar of flink-streaming-connectors ...I am able 
> to create jar of others like twitter,kafka,flume but I am not able to create 
> jar of flink-streaming connectors ?? How can I create this jar ??
> ---
>
> Key: FLINK-4008
> URL: https://issues.apache.org/jira/browse/FLINK-4008
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Akshay Shingote
>
> I filed an issue here https://github.com/apache/flink/pull/2058 ... I want to 
> know how can we create jar of Flink-Streaming-Connectors ??



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3989) Add guide to write a custom streaming connector in DataStream API documentation

2016-05-31 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307552#comment-15307552
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3989:


Just realized that the javadocs in the Java Streaming API have quite good and 
informative description for implementing source / sinks from the base classes. 
I'll try to integrate these existing content into a single documentation as the 
first draft.

> Add guide to write a custom streaming connector in DataStream API 
> documentation
> ---
>
> Key: FLINK-3989
> URL: https://issues.apache.org/jira/browse/FLINK-3989
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> I think it's a good idea to add a basic outline in the documentation for 
> users to compose their own streaming connector source / sinks, as its quite a 
> frequently asked question in the mailing lists and might be helpful for Flink 
> to get traction on new external systems faster.
> The documentation should contain information about what classes to extend, 
> what life cycle methods to implement, how to implement an exactly once 
> source, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards

2016-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4018:
---
Fix Version/s: 1.1.0

> Configurable idle time between getRecords requests to Kinesis shards
> 
>
> Key: FLINK-4018
> URL: https://issues.apache.org/jira/browse/FLINK-4018
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Currently, the Kinesis consumer is calling getRecords() right after finishing 
> previous calls. This results in easily reaching Amazon's limitation of 5 GET 
> requests per shard per second. Although the code already has backoff & retry 
> mechanism, this will affect other applications consuming from the same 
> Kinesis stream.
> Along with this new configuration and the already existing 
> `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more 
> control on the desired throughput behaviour for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor

2016-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4020:
---
Fix Version/s: 1.1.0

> Remove shard list querying from Kinesis consumer constructor
> 
>
> Key: FLINK-4020
> URL: https://issues.apache.org/jira/browse/FLINK-4020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Currently FlinkKinesisConsumer is querying for the whole list of shards in 
> the constructor, forcing the client to be able to access Kinesis as well. 
> This is also a drawback for handling Kinesis-side resharding, since we'd want 
> all shard listing / shard-to-task assigning / shard end (result of 
> resharding) handling logic to be capable of being independently done within 
> task life cycle methods, with defined and definite results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3231:
---
Fix Version/s: 1.1.0

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3924) Remove protobuf shading from Kinesis connector

2016-06-17 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3924:
---
Fix Version/s: 1.1.0

> Remove protobuf shading from Kinesis connector
> --
>
> Key: FLINK-3924
> URL: https://issues.apache.org/jira/browse/FLINK-3924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
>
> The Kinesis connector is currently creating a fat jar with a custom protobuf 
> version (2.6.1), relocated into a different package.
> We need to build the fat jar to change the protobuf calls from the original 
> protobuf to the relocated one.
> Because Kinesis is licensed under the Amazon Software License (which is not 
> entirely to the ASL2.0), I don't want to deploy kinesis connector binaries to 
> maven central with the releases. These binaries would contain code from 
> Amazon. It would be more than just linking to an (optional) dependencies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-4019:
--

Assignee: Tzu-Li (Gordon) Tai

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333858#comment-15333858
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4019:


Ok. I'll add this then along with the upcoming PR for reshard handling.

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4019) Expose approximateArrivalTimestamp through the KinesisDeserializationSchema interface

2016-06-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4019:
---
Fix Version/s: 1.1.0

> Expose approximateArrivalTimestamp through the KinesisDeserializationSchema 
> interface
> -
>
> Key: FLINK-4019
> URL: https://issues.apache.org/jira/browse/FLINK-4019
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Amazon's Record class also gives information about the timestamp of when 
> Kinesis successfully receives the record: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/Record.html#getApproximateArrivalTimestamp().
> This should be useful info for users and should be exposed through the 
> deserialization schema.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Summary: Add AWS Kinesis streaming connector with deep integration with 
Flink's checkpointing mechanics  (was: Add AWS Kinesis streaming connector)

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:30 PM:
-

Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shard infos to a Worker instance has 
private access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually still a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. A lot of 
parts can be based on the Kafka connector implementation.
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

Ref:
[1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4


was (Author: tzulitai):
Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Summary: Add AWS Kinesis streaming connector  (was: Add AWS Kinesis 
streaming connector with deep integration with Flink's checkpointing mechanics)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. 
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 12:03 PM:
--

Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually still a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. A lot of 
parts can be based on the Kafka connector implementation.
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

Ref:
[1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4


was (Author: tzulitai):
Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:56 PM:
-

I'm currently in-progress of the work, should be ready to open a pull request 
within a week.


was (Author: tzulitai):
I'm currently in-progress of the work, should be able to open a pull request 
within a week.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:56 PM:
-

I'm currently in-progress of the work, should be able to open a pull request 
within a week.


was (Author: tzulitai):
I'm currently in-progress of the work, shoulde be able to open a pull request 
within a week.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


I'm currently in-progress of the work, will probably be able to open a pull 
request within a week.

> Add AWS Kinesis streaming connector with deep integration with Flink's 
> checkpointing mechanics
> --
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics

2016-01-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 10:42 AM:
--

Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of 
work that the KCL has already covered, in light of that we can achieve the only 
built-in, exactly-once guaranteeing processing engine for Kinesis currently 
available on the table, I think it is worth the effort. Tasks that we will have 
to work on includes:

Task 1: Shard-to-task assignment for parallel data consumption.
Task 2: Assigning multiple streams to the consumer (this is actually still a 
wish-list feature for KCL [1])
Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or 
split.
Task 4: If in the future Flink can support dynamic reconfiguration of task 
parallelism (maybe for higher throughput), will also need to handle Flink-side 
parallelism change.

I propose that the scope of this JIRA focuses only on tasks 1 and 2. 
Task 3 includes a lot of extensive work which isn't necessary for Kinesis to 
work properly with Flink, and is out of the scope of this ticket.
Task 4 is blocked by the availability of dynamic task parallelism 
reconfiguration at runtime.

Ref:
[1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4


was (Author: tzulitai):
Re-clarification of why using the KCL is not viable if we want a deep 
integration of Kinesis with Flink's checkpointing & distributed snapshot 
mechanics for exactly-once guarantees:

After a closer at KCL's documentation and its source code, I realized that 
KCL's own design of parallel consumption of Kinesis data works pretty much like 
our Flink's Kafka connector as a whole: each instance of a KCL application is a 
"worker", which consumes records from multiple Kinesis "shards (partitions)" 
that are assigned to it in parallel threads. KCL handles which shards are 
assigned to which workers, and can do this dynamically in a sense that when new 
worker instances are discovered, shard-to-worker assignments can be reassigned 
at runtime.

Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis 
connector as the instance pool of a single KCL application. The implementation 
will simply be to instantiate and run a KCL worker on each consumer task. KCL 
can handle the shard-to-worker assignment after each consumer task starts 
running.

Unfortunately, there is a few shortcomings if we use KCL:

1. The method to access the assigned shards to a Worker instance has private 
access.
2. The logic of where a Worker instance continues reading from a shard after 
restore is deep in the KCL code and can not be externally configured. It is 
hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash 
table").

These two problems eventually led me to the conclusion that it isn't possible 
for KCL to work with Flink's checkpoint and restore mechanics. I've been 
looking around KCL source code for hacky ways to bypass the above issues, but 
have come short to figure out any solutions.

Therefore, AWS SDK is the way to 

[jira] [Updated] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-17 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3231:
---
Component/s: Kinesis Connector

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3983) Allow users to set any (relevant) configuration parameter of the KinesisProducerConfiguration

2016-06-17 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3983:
---
Component/s: Streaming Connectors

> Allow users to set any (relevant) configuration parameter of the 
> KinesisProducerConfiguration
> -
>
> Key: FLINK-3983
> URL: https://issues.apache.org/jira/browse/FLINK-3983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
>
> Currently, users can only set some of the configuration parameters in the 
> {{KinesisProducerConfiguration}} through Properties.
> It would be good to introduce configuration keys for these keys so that users 
> can change the producer configuration.
> I think these and most of the other variables in the 
> KinesisProducerConfiguration should be exposed via properties:
> - aggregationEnabled
> - collectionMaxCount
> - collectionMaxSize
> - connectTimeout
> - credentialsRefreshDelay
> - failIfThrottled
> - logLevel
> - metricsGranularity
> - metricsLevel
> - metricsNamespace
> - metricsUploadDelay



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4018) Configurable idle time between getRecords requests to Kinesis shards

2016-06-17 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4018:
---
Component/s: Kinesis Connector

> Configurable idle time between getRecords requests to Kinesis shards
> 
>
> Key: FLINK-4018
> URL: https://issues.apache.org/jira/browse/FLINK-4018
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Currently, the Kinesis consumer is calling getRecords() right after finishing 
> previous calls. This results in easily reaching Amazon's limitation of 5 GET 
> requests per shard per second. Although the code already has backoff & retry 
> mechanism, this will affect other applications consuming from the same 
> Kinesis stream.
> Along with this new configuration and the already existing 
> `KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET`, users will have more 
> control on the desired throughput behaviour for the Kinesis consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4020) Remove shard list querying from Kinesis consumer constructor

2016-06-17 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4020:
---
Component/s: Kinesis Connector

> Remove shard list querying from Kinesis consumer constructor
> 
>
> Key: FLINK-4020
> URL: https://issues.apache.org/jira/browse/FLINK-4020
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Currently FlinkKinesisConsumer is querying for the whole list of shards in 
> the constructor, forcing the client to be able to access Kinesis as well. 
> This is also a drawback for handling Kinesis-side resharding, since we'd want 
> all shard listing / shard-to-task assigning / shard end (result of 
> resharding) handling logic to be capable of being independently done within 
> task life cycle methods, with defined and definite results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4033) Missing Scala example snippets for the Kinesis Connector documentation

2016-06-29 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-4033:
--

Assignee: Tzu-Li (Gordon) Tai

> Missing Scala example snippets for the Kinesis Connector documentation
> --
>
> Key: FLINK-4033
> URL: https://issues.apache.org/jira/browse/FLINK-4033
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.1.0
>
>
> The documentation for the Kinesis connector is missing Scala version of the 
> example snippets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206770#comment-15206770
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 3/22/16 4:52 PM:
-

No problem :) I'll have it ready soon, hopefully by the end of March.


was (Author: tzulitai):
No problem :) I'll have it ready soon.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206770#comment-15206770
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


No problem :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206709#comment-15206709
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Hi Roy & Robert

Apologies for the late reply.
I already have a working version that I have been using in non-production 
environments for a while now.
The main reason for not sending a PR yet is because of the lack of tests for 
the Kinesis producer. The main issue is with integration tests, which is hard 
since Kinesis is an AWS service and doesn't come with a local version.

If this isn't too much of an issue, I can surely PR soon!

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] 

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206739#comment-15206739
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Hi, thanks Roy.

I don't think using the actual Kinesis API will be feasible since the tests 
will be part of the build.
I did take a look at kinesalite at first and gave it a try, but was still quite 
tedious to integrate it into the code base for the sake of tests since it isn't 
Java/Scala.

I am still quite new and unfamiliar with PR to the Flink community, and was 
uncertain of sending out the PR before all tests were properly covered.
Nevertheless, if integration tests aren't too much of an issue for the 
connector and the connector is urgently needed, I can start polishing things up 
and prepare the PR!

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> 

[jira] [Commented] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton

2016-05-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279399#comment-15279399
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3852:


Hi Mark,

I think replacing the StreamingJob skeleton as the main class doesn't match 
what this issue is aiming for.

I see 2 approaches here:

1. BatchJob skeleton and StreamingJob skeleton within same quickstart module ( 
i.e. Batch & Streaming Java / Batch & Streaming Scala)
In the javadoc within each skeleton, you can guide the user to use `./bin/flink 
run -c  ` as instructed in 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html.
Instead on defining a mainClass in the shade plugin, comment the transformers 
for the mainClass setting out with instructions that the user can add this back 
if they decide to simply use `./bin/flink run ` with either one 
of the skeletons as main class entry point.

2. Have 4 separate quickstarts, i.e. Batch Java / Batch Scala / Streaming Java 
/ Streaming Scala.

I like the first approach better, since it also has a natural way of letting 
the user know that there are different submission options for a packaged Flink 
jar job.

> Use a StreamExecutionEnvironment in the quickstart job skeleton
> ---
>
> Key: FLINK-3852
> URL: https://issues.apache.org/jira/browse/FLINK-3852
> Project: Flink
>  Issue Type: Task
>  Components: Quickstarts
>Reporter: Robert Metzger
>  Labels: starter
>
> The Job skeleton created by the maven archetype "quickstart" is still setting 
> up an ExecutionEnvironment, not a StreamExecutionEnvironment.
> These days, most users are using Flink for streaming, so we should reflect 
> that in the quickstart as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton

2016-05-10 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3852:
---
Comment: was deleted

(was: Hi Mark,

I think replacing the StreamingJob skeleton as the main class doesn't match 
what this issue is aiming for.

I see 2 approaches here:

1. BatchJob skeleton and StreamingJob skeleton within same quickstart module ( 
i.e. Batch & Streaming Java / Batch & Streaming Scala)
In the javadoc within each skeleton, you can guide the user to use `./bin/flink 
run -c  ` as instructed in 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html.
Instead on defining a mainClass in the shade plugin, comment the transformers 
for the mainClass setting out with instructions that the user can add this back 
if they decide to simply use `./bin/flink run ` with either one 
of the skeletons as main class entry point.

2. Have 4 separate quickstarts, i.e. Batch Java / Batch Scala / Streaming Java 
/ Streaming Scala.

I like the first approach better, since it also has a natural way of letting 
the user know that there are different submission options for a packaged Flink 
jar job.)

> Use a StreamExecutionEnvironment in the quickstart job skeleton
> ---
>
> Key: FLINK-3852
> URL: https://issues.apache.org/jira/browse/FLINK-3852
> Project: Flink
>  Issue Type: Task
>  Components: Quickstarts
>Reporter: Robert Metzger
>  Labels: starter
>
> The Job skeleton created by the maven archetype "quickstart" is still setting 
> up an ExecutionEnvironment, not a StreamExecutionEnvironment.
> These days, most users are using Flink for streaming, so we should reflect 
> that in the quickstart as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3852) Use a StreamExecutionEnvironment in the quickstart job skeleton

2016-05-10 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279400#comment-15279400
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3852:


Hi Mark,

I think replacing the StreamingJob skeleton as the main class doesn't match 
what this issue is aiming for.

I see 2 approaches here:

1. BatchJob skeleton and StreamingJob skeleton within same quickstart module ( 
i.e. Batch & Streaming Java / Batch & Streaming Scala)
In the javadoc within each skeleton, you can guide the user to use 
{noformat}./bin/flink run -c   
{noformat} as instructed in 
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/cli.html.
Instead of defining a mainClass in the shade plugin, comment the transformers 
for the mainClass setting out with instructions that the user can add this back 
if they decide to simply use {noformat}./bin/flink run {noformat} with either one of the skeletons as main class entry point.

2. Have 4 separate quickstarts, i.e. Batch Java / Batch Scala / Streaming Java 
/ Streaming Scala.
I like the first approach better, since it also has a natural way of letting 
the user know that there are different submission options for a packaged Flink 
jar job.

> Use a StreamExecutionEnvironment in the quickstart job skeleton
> ---
>
> Key: FLINK-3852
> URL: https://issues.apache.org/jira/browse/FLINK-3852
> Project: Flink
>  Issue Type: Task
>  Components: Quickstarts
>Reporter: Robert Metzger
>  Labels: starter
>
> The Job skeleton created by the maven archetype "quickstart" is still setting 
> up an ExecutionEnvironment, not a StreamExecutionEnvironment.
> These days, most users are using Flink for streaming, so we should reflect 
> that in the quickstart as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15280969#comment-15280969
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


[~rmetzger]
Hi Robert,
Can you tell me how you were testing the Kinesis connector in AWS EMR before? 
I'd like to try it myself too when I get back, also to learn the process :) 
Thanks.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-05-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15279737#comment-15279737
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


https://github.com/tzulitai/flink/tree/FLINK-3229-RemoveKclKplRework

The above link is the separate branch Ihave been working on for the related 
changes to the KCL/KPL dependency rework. I've finished with the replacement of 
KPL in the producer, please feel free to test it around / review before we 
decide if we should open a separate issue & PR for this or merge into the 
original PR.

As for the KCL in the consumer which we're only using the 'UserRecord' class 
for deaggregating records, I'm currently a bit stuck because the original 
implementation of the class in the KCL uses protobuf generated code, and I'm 
not sure whether or not it is suitable to simple make a copy of it in the 
connector. On the other hand, whether or not we should still deaggregate 
records might be discussable because record aggregation is essentially a 
built-in functionality of the KPL, and we aren't using KPL anymore.

Besides the KCL, some more tasks to do for this rework:
1. Tests for the new classes introduced with the rework on the producer
2. I'd like to unify how the consumer / producer is configured. Currently they 
are taking different approaches, which is kind of strange since they are in the 
same connector. I'll probably go for an approach more like the previous 
producer.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view 

[jira] [Updated] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3229:
---
Description: 
Opening a sub-task to implement data source consumer for Kinesis streaming 
connector (https://issues.apache.org/jira/browser/FLINK-3211).

An example of the planned user API for Flink Kinesis Consumer:

{code}
Properties kinesisConfig = new Properties();
config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
"BASIC");
config.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
"aws_access_key_id_here");
config.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); 
// or TRIM_HORIZON

DataStream kinesisRecords = env
.addSource(new FlinkKinesisConsumer<>("kinesis_stream_name", new 
SimpleStringSchema(), kinesisConfig);
{code}

  was:
Opening a sub-task to implement data source consumer for Kinesis streaming 
connector (https://issues.apache.org/jira/browser/FLINK-3211).

An example of the planned user API for Flink Kinesis Consumer:

{code}
Properties kinesisConfig = new Properties();
config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
"BASIC");
config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
 "aws_access_key_id_here");
config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
 "aws_secret_key_here");
config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); 
// or TRIM_HORIZON

DataStream kinesisRecords = env
.addSource(new FlinkKinesisConsumer<>("kinesis_stream_name", new 
SimpleStringSchema(), kinesisConfig);
{code}


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env
> .addSource(new FlinkKinesisConsumer<>("kinesis_stream_name", new 
> SimpleStringSchema(), kinesisConfig);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3229:
---
Description: 
Opening a sub-task to implement data source consumer for Kinesis streaming 
connector (https://issues.apache.org/jira/browser/FLINK-3211).

An example of the planned user API for Flink Kinesis Consumer:

{code}
Properties kinesisConfig = new Properties();
config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
"BASIC");
config.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
"aws_access_key_id_here");
config.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); 
// or TRIM_HORIZON

DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name",
new SimpleStringSchema(),
kinesisConfig));
{code}

  was:
Opening a sub-task to implement data source consumer for Kinesis streaming 
connector (https://issues.apache.org/jira/browser/FLINK-3211).

An example of the planned user API for Flink Kinesis Consumer:

{code}
Properties kinesisConfig = new Properties();
config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
"BASIC");
config.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
"aws_access_key_id_here");
config.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST"); 
// or TRIM_HORIZON

DataStream kinesisRecords = env
.addSource(new FlinkKinesisConsumer<>("kinesis_stream_name", new 
SimpleStringSchema(), kinesisConfig);
{code}


> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties kinesisConfig = new Properties();
> config.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
> config.put(KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_TYPE, 
> "BASIC");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID, 
> "aws_access_key_id_here");
> config.put(
> KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
> "aws_secret_key_here");
> config.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, 
> "LATEST"); // or TRIM_HORIZON
> DataStream kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name",
> new SimpleStringSchema(),
> kinesisConfig));
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243988#comment-15243988
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Affects Version/s: (was: 1.0.0)
   1.1.0

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243988#comment-15243988
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 4/16/16 3:53 AM:
-

https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.

In the description of https://issues.apache.org/jira/browse/FLINK-3229 is 
example code of how to use the consumer.

I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.


was (Author: tzulitai):
https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK 

[jira] [Commented] (FLINK-3229) Kinesis streaming consumer with integration of Flink's checkpointing mechanics

2016-04-15 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15243991#comment-15243991
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3229:


(Duplicate comment from FLINK-3211. Posting it here also to keep the issue 
updated.)

https://github.com/tzulitai/flink/tree/FLINK-3229/flink-streaming-connectors/flink-connector-kinesis

Here is the initial working version of FlinkKinesisConsumer that I have been 
testing in off-production environments, updated corresponding to the recent 
Flink 1.0 changes.
I'm still refactoring the code just a bit for easier unit tests. The PR will be 
very soon.

> Kinesis streaming consumer with integration of Flink's checkpointing mechanics
> --
>
> Key: FLINK-3229
> URL: https://issues.apache.org/jira/browse/FLINK-3229
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Opening a sub-task to implement data source consumer for Kinesis streaming 
> connector (https://issues.apache.org/jira/browser/FLINK-3211).
> An example of the planned user API for Flink Kinesis Consumer:
> {code}
> Properties config = new Properties();
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_RETRIES, "3");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_DESCRIBE_BACKOFF_MILLIS, 
> "1000");
> config.put(FlinkKinesisConsumer.CONFIG_STREAM_START_POSITION_TYPE, "latest");
> config.put(FlinkKinesisConsumer.CONFIG_AWS_REGION, "us-east-1");
> AWSCredentialsProvider credentials = // credentials API in AWS SDK
> DataStream kinesisRecords = env
> .addSource(new FlinkKinesisConsumer<>(
> listOfStreams, credentials, new SimpleStringSchema(), config
> ));
> {code}
> Currently still considering which read start positions to support 
> ("TRIM_HORIZON", "LATEST", "AT_SEQUENCE_NUMBER"). The discussions for this 
> can be found in https://issues.apache.org/jira/browser/FLINK-3211.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   5   6   7   8   9   10   >