[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Fix Version/s: (was: 1.0.0)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> After a first look at the AWS KCL (Kinesis Client Library), KCL already 
> supports stream read beginning from a specific offset (or "record sequence 
> number" in Kinesis terminology). For external checkpointing, KCL is designed  
> to use AWS DynamoDB to checkpoint application state, where each partition's 
> progress (or "shard" in Kinesis terminology) corresponds to a single row in 
> the KCL-managed DynamoDB table.
> So, implementing the AWS Kinesis connector will very much resemble the work 
> done on the Kafka connector, with a few different tweaks as following (I'm 
> mainly just rewording [~StephanEwen]'s original description [1]):
> 1. Determine KCL Shard Worker to Flink source task mapping. KCL already 
> offers worker tasks per shard, so we will need to do mapping much like [2].
> 2. Let the Flink connector also maintain a local copy of application state, 
> accessed using KCL API, for the distributed snapshot checkpointing.
> 3. Restart the KCL at the last Flink local checkpointed record sequence upon 
> failure. However, when KCL restarts after failure, it is originally designed 
> to reference the external DynamoDB table. Need a further look on how to work 
> with this so that the Flink checkpoint and external checkpoint in DynamoDB is 
> properly synced.
> Most of the details regarding KCL's state checkpointing, sharding, shard 
> workers, and failure recovery can be found here [3].
> As for the Kinesis sink connector, it should be fairly straightforward and 
> almost, if not completely, identical to the Kafka sink. The Kinesis sink can 
> be implemented with AWS KPL (Kinesis Producer Library) [4].
> On the other hand, while KCL and KPL are handy high-level APIs for AWS 
> Kinesis, it might be preferable for the user to use the low-level API in AWS 
> SDK instead, mainly due to the fact that the utilities of KCL and KPL come at 
> cost, such as extra cost for the DynamoDB table for KCL checkpoint and the 
> extra latency introduced by KPL [4]. For now, we can implement the initial 
> version of the Kinesis connector with KCL and KPL, and leave the work for 
> user-configurable switch for the connector to use low-level AWS SDK as future 
> to-dos.
> References:
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [2] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [3] http://docs.aws.amazon.com/kinesis/latest/dev/advanced-consumers.html
> [4] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). Kinesis streams can exist in 
different AWS regions, and each Kinesis stream under the same AWS user account 
may have completely independent access settings with different authorization 
keys. Overall, a Kinesis stream feels like a much more consolidated resource 
compared to Kafka topics. It would be great to hear more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). AWS can manage Kinesis across 
different regions, and each Kinesis stream under the same AWS user account may 
have completely independent access settings with different authorization keys. 
Overall, a Kinesis stream feels like a much more consolidated resource compared 
to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). Kinesis streams can exist in 
different AWS regions, and each Kinesis stream under the same AWS user account 
may have completely independent access settings with different authorization 
keys. Overall, a Kinesis stream feels like a much more consolidated resource 
compared to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Affects Version/s: 1.0.0

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> After a first look at the AWS KCL (Kinesis Client Library), KCL already 
> supports stream read beginning from a specific offset (or "record sequence 
> number" in Kinesis terminology). For external checkpointing, KCL is designed  
> to use AWS DynamoDB to checkpoint application state, where each partition's 
> progress (or "shard" in Kinesis terminology) corresponds to a single row in 
> the KCL-managed DynamoDB table.
> So, implementing the AWS Kinesis connector will very much resemble the work 
> done on the Kafka connector, with a few different tweaks as following (I'm 
> mainly just rewording [~StephanEwen]'s original description [1]):
> 1. Determine KCL Shard Worker to Flink source task mapping. KCL already 
> offers worker tasks per shard, so we will need to do mapping much like [2].
> 2. Let the Flink connector also maintain a local copy of application state, 
> accessed using KCL API, for the distributed snapshot checkpointing.
> 3. Restart the KCL at the last Flink local checkpointed record sequence upon 
> failure. However, when KCL restarts after failure, it is originally designed 
> to reference the external DynamoDB table. Need a further look on how to work 
> with this so that the Flink checkpoint and external checkpoint in DynamoDB is 
> properly synced.
> Most of the details regarding KCL's state checkpointing, sharding, shard 
> workers, and failure recovery can be found here [3].
> As for the Kinesis sink connector, it should be fairly straightforward and 
> almost, if not completely, identical to the Kafka sink. The Kinesis sink can 
> be implemented with AWS KPL (Kinesis Producer Library) [4].
> On the other hand, while KCL and KPL are handy high-level APIs for AWS 
> Kinesis, it might be preferable for the user to use the low-level API in AWS 
> SDK instead, mainly due to the fact that the utilities of KCL and KPL come at 
> cost, such as extra cost for the DynamoDB table for KCL checkpoint and the 
> extra latency introduced by KPL [4]. For now, we can implement the initial 
> version of the Kinesis connector with KCL and KPL, and leave the work for 
> user-configurable switch for the connector to use low-level AWS SDK as future 
> to-dos.
> References:
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [2] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [3] http://docs.aws.amazon.com/kinesis/latest/dev/advanced-consumers.html
> [4] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a more 
higher-level of abstraction that also comes with checkpointing and failure 
recovery by using a KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). AWS can manage Kinesis across 
different regions, and each Kinesis stream under the same AWS user account may 
have completely independent access settings with different authorization keys. 
Overall, a Kinesis stream feels like a much more consolidated resource compared 
to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

After a first look at the AWS KCL (Kinesis Client Library), KCL already 
supports stream read beginning from a specific offset (or "record sequence 
number" in Kinesis terminology). For external checkpointing, KCL is designed  
to use AWS DynamoDB to checkpoint application state, where each partition's 
progress (or "shard" in Kinesis terminology) corresponds to a single row in the 
KCL-managed DynamoDB table.

So, implementing the AWS Kinesis connector will very much resemble the work 
done on the Kafka connector, with a few different tweaks as following (I'm 
mainly just rewording [~StephanEwen]'s original description [1]):

1. Determine KCL Shard Worker to Flink source 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). AWS can manage Kinesis across 
different regions, and each Kinesis stream under the same AWS user account may 
have completely independent access settings with different authorization keys. 
Overall, a Kinesis stream feels like a much more consolidated resource compared 
to Kafka topics. It would be great to here more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to still go with the AWS SDK for 
the implementation. The implementation should be straight forward, being almost 
if not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a more 
higher-level of abstraction that also comes with checkpointing and failure 
recovery by using a KCL-managed AWS DynamoDB as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination and 
checkpointing to allow the user to focus only on streaming application logic. 
This leads to the understanding that we can not use the KCL to implement the 
Kinesis streaming connector if we are aiming for a deep integration of Flink 
with Kinesis that provides exactly once guarantees (KCL promises only 
at-least-once). Therefore, AWS SDK will be the way to go for the implementation 
of this feature.

With the ability to read from specific offsets, and also the fact that Kinesis 
and Kafka share a lot of similarities, the basic principles of the 
implementation of Flink's Kinesis streaming connector will very much resemble 
the Kafka connector. We can basically follow the outlines described in 
[~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka differences 
is described as following:

1. While the Kafka connector can support reading from multiple topics, I 
currently don't think this is a good idea for Kinesis streams (a Kinesis Stream 
is logically equivalent to a Kafka topic). Kinesis streams can exist in 
different AWS regions, and each Kinesis stream under the same AWS user account 
may have completely independent access settings with different authorization 
keys. Overall, a Kinesis stream feels like a much more consolidated resource 
compared to Kafka topics. It would be great to hear more thoughts on this part.

2. While Kafka has brokers that can hold multiple partitions, the only 
partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast to 
the Kafka connector having per broker connections where the connections can 
handle multiple Kafka partitions, the Kinesis connector will only need to have 
simple per shard connections.

3. Kinesis itself does not support committing offsets back to Kinesis. If we 
were to implement this feature like the Kafka connector with Kafka / ZK, we 
probably could use ZK or DynamoDB like the way KCL works. More thoughts on this 
part will be very helpful too.


As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
Producer Library) [5]. However, for higher code consistency with the proposed 
Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
implementation. The implementation should be straight forward, being almost if 
not completely the same as the Kafka sink.

References:
[1] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
[2] 
http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
[3] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[4] http://data-artisans.com/kafka-flink-a-practical-how-to/
[5] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

AWS supports two different ways to consume Kinesis data: with the low-level AWS 
SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK can 
be used to consume Kinesis data, including stream read beginning from a 
specific offset (or "record sequence number" in Kinesis terminology). On the 
other hand, AWS officially recommends using KCL, which offers a higher-level of 
abstraction that also comes with checkpointing and failure recovery by using a 
KCL-managed AWS DynamoDB table as the checkpoint state storage.

However, KCL is essentially a stream processing library that wraps all the 
partition-to-task (or "shard" in Kinesis terminology) determination 

[GitHub] flink pull request: [FLINK-1870] Reintroduce indexed reader functi...

2016-01-09 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1483#issuecomment-170244363
  
I understand your skeptic, but I don't see it as dangerous as you. If 
somebody access this information and does not use it properly, it's his/her own 
fault... (same as using a static variable in an UDF -- we cannot "protect" 
users from writing bad code).

Nevertheless, originally I had the same idea as you suggest (ie, use 
`processRecord()`). However, the compatibility layer also used two 
`CoFlatMapFunction`s which need to access this information, too. 

 - 
https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java
 - 
https://github.com/apache/flink/blob/master/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java

It would be possible to rewrite both functions as custom operators, but 
this makes the translation code for multiple inputs streams more complex. Thus, 
I would prefer to expose this information in `RuntimeContext`. We can also 
extend the JavaDoc to warn the user to use this information carefully...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1994] [ml] Add different gain calculati...

2016-01-09 Thread rawkintrevo
Github user rawkintrevo commented on the pull request:

https://github.com/apache/flink/pull/1397#issuecomment-170257023
  
still refactoring to enumeration, but wanted to toss this up in case anyone 
is watching- I added a decay parameter which significantly generalizes the Xu 
and Inverse Scaling Methods. Also I added some docs, but am open to 
suggestions, for example- once I figure out how to set the method with 
enumeration, I'll include a note?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1994) Add different gain calculation schemes to SGD

2016-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090670#comment-15090670
 ] 

ASF GitHub Bot commented on FLINK-1994:
---

Github user rawkintrevo commented on the pull request:

https://github.com/apache/flink/pull/1397#issuecomment-170257023
  
still refactoring to enumeration, but wanted to toss this up in case anyone 
is watching- I added a decay parameter which significantly generalizes the Xu 
and Inverse Scaling Methods. Also I added some docs, but am open to 
suggestions, for example- once I figure out how to set the method with 
enumeration, I'll include a note?


> Add different gain calculation schemes to SGD
> -
>
> Key: FLINK-1994
> URL: https://issues.apache.org/jira/browse/FLINK-1994
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Trevor Grant
>Priority: Minor
>  Labels: ML, Starter
>
> The current SGD implementation uses as gain for the weight updates the 
> formula {{stepsize/sqrt(iterationNumber)}}. It would be good to make the gain 
> calculation configurable and to provide different strategies for that. For 
> example:
> * stepsize/(1 + iterationNumber)
> * stepsize*(1 + regularization * stepsize * iterationNumber)^(-3/4)
> See also how to properly select the gains [1].
> Resources:
> [1] http://arxiv.org/pdf/1107.2490.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090928#comment-15090928
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> 

[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090928#comment-15090928
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 6:40 AM:
-

A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region, also that they all have identical access settings 
(i.e., can all be accessed with the same single AWS credential).


was (Author: tzulitai):
A follow-up regarding whether or not the Kinesis streaming connector should 
support parallel data pulling from multiple Kinesis Streams.

It makes sense to pull from multiple Kinesis streams if they are in the same 
AWS region, since use cases of globally deployed pipelines usually has 
central-regional practices where a message queue like Kinesis / Kafka will also 
be deployed in the central hub to ingest intermediate results from regional 
hubs.

Therefore, it is unlikely that there are use cases where Flink will need to 
consume data from cross-region AWS Kinesis streams. I think its a good balance 
to design the Flink Kinesis streaming connector with support to parallel pull 
from multiple Kinesis Streams with the restriction that they are all deployed 
in the same AWS region.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB table as the checkpoint state storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning 

[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector

2016-01-09 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-3211:
---
Description: 
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

After a first look at the AWS KCL (Kinesis Client Library), KCL already 
supports stream read beginning from a specific offset (or "record sequence 
number" in Kinesis terminology). For external checkpointing, KCL is designed  
to use AWS DynamoDB to checkpoint application state, where each partition's 
progress (or "shard" in Kinesis terminology) corresponds to a single row in the 
KCL-managed DynamoDB table.

So, implementing the AWS Kinesis connector will very much resemble the work 
done on the Kafka connector, with a few different tweaks as following (I'm 
mainly just rewording [~StephanEwen]'s original description [1]):

1. Determine KCL Shard Worker to Flink source task mapping. KCL already offers 
worker tasks per shard, so we will need to do mapping much like [2].

2. Let the Flink connector also maintain a local copy of application state, 
accessed using KCL API, for the distributed snapshot checkpointing.

3. Restart the KCL at the last Flink local checkpointed record sequence upon 
failure. However, when KCL restarts after failure, it is originally designed to 
reference the external DynamoDB table. Need a further look on how to work with 
this so that the Flink checkpoint and external checkpoint in DynamoDB is 
properly synced.

Most of the details regarding KCL's state checkpointing, sharding, shard 
workers, and failure recovery can be found here [3].

As for the Kinesis sink connector, it should be fairly straightforward and 
almost, if not completely, identical to the Kafka sink. The Kinesis sink can be 
implemented with AWS KPL (Kinesis Producer Library) [4].

On the other hand, while KCL and KPL are handy high-level APIs for AWS Kinesis, 
it might be preferable for the user to use the low-level API in AWS SDK 
instead, mainly due to the fact that the utilities of KCL and KPL come at cost, 
such as extra cost for the DynamoDB table for KCL checkpoint and the extra 
latency introduced by KPL [4]. For now, we can implement the initial version of 
the Kinesis connector with KCL and KPL, and leave the work for 
user-configurable switch for the connector to use low-level AWS SDK as future 
to-dos.

References:
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[2] http://data-artisans.com/kafka-flink-a-practical-how-to/
[3] http://docs.aws.amazon.com/kinesis/latest/dev/advanced-consumers.html
[4] 
http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998

  was:
AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
cloud service version of Apache Kafka. Support for AWS Kinesis will be a great 
addition to the handful of Flink's streaming connectors to external systems and 
a great reach out to the AWS community.

After a first look at the AWS KCL (Kinesis Client Library), KCL already 
supports stream read beginning from a specific offset (or "record sequence 
number" in Kinesis terminology). For external checkpointing, KCL is designed  
to use AWS DynamoDB to checkpoint application state, where each partition's 
progress (or "shard" in Kinesis terminology) corresponds to a single row in the 
KCL-managed DynamoDB table.

So, implementing the AWS Kinesis connector will very much resemble the work 
done on the Kafka connector, with a few different tweaks as following (I'm 
mainly just rewording [~StephanEwen]'s original description [1]):

1. Determine KCL Shard Worker to Flink source task mapping. KCL already offers 
worker tasks per shard, so we will need to do mapping much like [2].

2. Let the Flink connector also maintain a local copy of application state, 
accessed using KCL API, for the distributed snapshot checkpointing.

3. Restart the KCL at the last Flink local checkpointed record sequence upon 
failure. However, when KCL restarts after failure, it is originally designed to 
reference the external DynamoDB table. Need a further look on how to work with 
this so that the Flink checkpoint and external checkpoint in DynamoDB is 
properly synced.

Most of the details regarding KCL's state checkpointing, sharding, shard 
workers, and failure recovery can be found here [3].

As for the Kinesis sink connector, it should be fairly straightforward and 
almost, if not completely, identical to the Kafka sink.

References:
[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
[2] http://data-artisans.com/kafka-flink-a-practical-how-to/
[3]