[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091408#comment-15091408 ] Tzu-Li (Gordon) Tai commented on FLINK-3211: Thanks [~StephanEwen] for the response and help on JIRA permission assignment. Noted on implementation to offer the user to decide where in the Kinesis stream to start reading ;) > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-3211: -- Assignee: Tzu-Li (Gordon) Tai > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3212) JobManagerCheckpointRecoveryITCase
[ https://issues.apache.org/jira/browse/FLINK-3212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091516#comment-15091516 ] Matthias J. Sax commented on FLINK-3212: Again: this time, test hung for 300 seconds and build got canceled: https://travis-ci.org/mjsax/flink/jobs/101460076 > JobManagerCheckpointRecoveryITCase > -- > > Key: FLINK-3212 > URL: https://issues.apache.org/jira/browse/FLINK-3212 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: test-stability > > {noformat} > Tests in error: > JobManagerCheckpointRecoveryITCase.testCheckpointRecoveryFailure:354 » > IllegalState > JobManagerCheckpointRecoveryITCase.testCheckpointedStreamingSumProgram:192 » > IO > {noformat} > https://travis-ci.org/mjsax/flink/jobs/101407273 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3214) WindowCheckpointingITCase
Matthias J. Sax created FLINK-3214: -- Summary: WindowCheckpointingITCase Key: FLINK-3214 URL: https://issues.apache.org/jira/browse/FLINK-3214 Project: Flink Issue Type: Bug Components: Tests Reporter: Matthias J. Sax Priority: Critical No Output for 300 seconds. Build got canceled. https://travis-ci.org/apache/flink/jobs/101407292 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1994) Add different gain calculation schemes to SGD
[ https://issues.apache.org/jira/browse/FLINK-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091533#comment-15091533 ] ASF GitHub Bot commented on FLINK-1994: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1397#issuecomment-170451422 I still would like to use enumeration because if we use string parameters, the user cannot check that the input parameter is valid. But if you have some problems to use enumeration, I'll modify your pull request before merging it. Failing CI seems unrelated this PR. I'll check and test this. > Add different gain calculation schemes to SGD > - > > Key: FLINK-1994 > URL: https://issues.apache.org/jira/browse/FLINK-1994 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Till Rohrmann >Assignee: Trevor Grant >Priority: Minor > Labels: ML, Starter > > The current SGD implementation uses as gain for the weight updates the > formula {{stepsize/sqrt(iterationNumber)}}. It would be good to make the gain > calculation configurable and to provide different strategies for that. For > example: > * stepsize/(1 + iterationNumber) > * stepsize*(1 + regularization * stepsize * iterationNumber)^(-3/4) > See also how to properly select the gains [1]. > Resources: > [1] http://arxiv.org/pdf/1107.2490.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1994] [ml] Add different gain calculati...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1397#issuecomment-170451422 I still would like to use enumeration because if we use string parameters, the user cannot check that the input parameter is valid. But if you have some problems to use enumeration, I'll modify your pull request before merging it. Failing CI seems unrelated this PR. I'll check and test this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-3211: --- Summary: Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics (was: Add AWS Kinesis streaming connector) > Add AWS Kinesis streaming connector with deep integration with Flink's > checkpointing mechanics > -- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB table as the checkpoint state storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974 ] Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:30 PM: - Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees: After a closer at KCL's documentation and its source code, I realized that KCL's own design of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector as a whole: each instance of a KCL application is a "worker", which consumes records from multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles which shards are assigned to which workers, and can do this dynamically in a sense that when new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime. Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as the instance pool of a single KCL application. The implementation will simply be to instantiate and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment after each consumer task starts running. Unfortunately, there is a few shortcomings if we use KCL: 1. The method to access the assigned shard infos to a Worker instance has private access. 2. The logic of where a Worker instance continues reading from a shard after restore is deep in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash table"). These two problems eventually led me to the conclusion that it isn't possible for KCL to work with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for hacky ways to bypass the above issues, but have come short to figure out any solutions. Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of work that the KCL has already covered, in light of that we can achieve the only built-in, exactly-once guaranteeing processing engine for Kinesis currently available on the table, I think it is worth the effort. Tasks that we will have to work on includes: Task 1: Shard-to-task assignment for parallel data consumption. Task 2: Assigning multiple streams to the consumer (this is actually still a wish-list feature for KCL [1]) Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or split. Task 4: If in the future Flink can support dynamic reconfiguration of task parallelism (maybe for higher throughput), will also need to handle Flink-side parallelism change. I propose that the scope of this JIRA focuses only on tasks 1 and 2. A lot of parts can be based on the Kafka connector implementation. Task 3 includes a lot of extensive work which isn't necessary for Kinesis to work properly with Flink, and is out of the scope of this ticket. Task 4 is blocked by the availability of dynamic task parallelism reconfiguration at runtime. Ref: [1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4 was (Author: tzulitai): Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees: After a closer at KCL's documentation and its source code, I realized that KCL's own design of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector as a whole: each instance of a KCL application is a "worker", which consumes records from multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles which shards are assigned to which workers, and can do this dynamically in a sense that when new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime. Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as the instance pool of a single KCL application. The implementation will simply be to instantiate and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment after each consumer task starts running. Unfortunately, there is a few shortcomings if we use KCL: 1. The method to access the assigned shards to a Worker instance has private access. 2. The logic of where a Worker instance continues reading from a shard after restore is deep in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash table"). These two problems eventually led me to the conclusion that it isn't possible for KCL to work with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for hacky ways to bypass the above issues, but have come
[jira] [Updated] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-3211: --- Summary: Add AWS Kinesis streaming connector (was: Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics) > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974 ] Tzu-Li (Gordon) Tai commented on FLINK-3211: Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees: After a closer at KCL's documentation and its source code, I realized that KCL's own design of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector as a whole: each instance of a KCL application is a "worker", which consumes records from multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles which shards are assigned to which workers, and can do this dynamically in a sense that when new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime. Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as the instance pool of a single KCL application. The implementation will simply be to instantiate and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment after each consumer task starts running. Unfortunately, there is a few shortcomings if we use KCL: 1. The method to access the assigned shards to a Worker instance has private access. 2. The logic of where a Worker instance continues reading from a shard after restore is deep in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash table"). These two problems eventually led me to the conclusion that it isn't possible for KCL to work with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for hacky ways to bypass the above issues, but have come short to figure out any solutions. Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of work that the KCL has already covered, in light of that we can achieve the only built-in, exactly-once guaranteeing processing engine for Kinesis currently available on the table, I think it is worth the effort. Tasks that we will have to work on includes: Task 1: Shard-to-task assignment for parallel data consumption. Task 2: Assigning multiple streams to the consumer (this is actually a wish-list feature for KCL [1]) Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or split. Task 4: If in the future Flink can support dynamic reconfiguration of task parallelism (maybe for higher throughput), will also need to handle Flink-side parallelism change. I propose that the scope of this JIRA focuses only on tasks 1 and 2. Task 3 includes a lot of extensive work which isn't necessary for Kinesis to work properly with Flink, and is out of the scope of this ticket. Task 4 is blocked by the availability of dynamic task parallelism reconfiguration at runtime. > Add AWS Kinesis streaming connector with deep integration with Flink's > checkpointing mechanics > -- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB table as the checkpoint state storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation
[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974 ] Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 12:03 PM: -- Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees: After a closer at KCL's documentation and its source code, I realized that KCL's own design of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector as a whole: each instance of a KCL application is a "worker", which consumes records from multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles which shards are assigned to which workers, and can do this dynamically in a sense that when new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime. Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as the instance pool of a single KCL application. The implementation will simply be to instantiate and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment after each consumer task starts running. Unfortunately, there is a few shortcomings if we use KCL: 1. The method to access the assigned shards to a Worker instance has private access. 2. The logic of where a Worker instance continues reading from a shard after restore is deep in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash table"). These two problems eventually led me to the conclusion that it isn't possible for KCL to work with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for hacky ways to bypass the above issues, but have come short to figure out any solutions. Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of work that the KCL has already covered, in light of that we can achieve the only built-in, exactly-once guaranteeing processing engine for Kinesis currently available on the table, I think it is worth the effort. Tasks that we will have to work on includes: Task 1: Shard-to-task assignment for parallel data consumption. Task 2: Assigning multiple streams to the consumer (this is actually still a wish-list feature for KCL [1]) Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or split. Task 4: If in the future Flink can support dynamic reconfiguration of task parallelism (maybe for higher throughput), will also need to handle Flink-side parallelism change. I propose that the scope of this JIRA focuses only on tasks 1 and 2. A lot of parts can be based on the Kafka connector implementation. Task 3 includes a lot of extensive work which isn't necessary for Kinesis to work properly with Flink, and is out of the scope of this ticket. Task 4 is blocked by the availability of dynamic task parallelism reconfiguration at runtime. Ref: [1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4 was (Author: tzulitai): Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees: After a closer at KCL's documentation and its source code, I realized that KCL's own design of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector as a whole: each instance of a KCL application is a "worker", which consumes records from multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles which shards are assigned to which workers, and can do this dynamically in a sense that when new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime. Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as the instance pool of a single KCL application. The implementation will simply be to instantiate and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment after each consumer task starts running. Unfortunately, there is a few shortcomings if we use KCL: 1. The method to access the assigned shards to a Worker instance has private access. 2. The logic of where a Worker instance continues reading from a shard after restore is deep in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash table"). These two problems eventually led me to the conclusion that it isn't possible for KCL to work with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for hacky ways to bypass the above issues, but have come
[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050 ] Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:56 PM: - I'm currently in-progress of the work, should be ready to open a pull request within a week. was (Author: tzulitai): I'm currently in-progress of the work, should be able to open a pull request within a week. > Add AWS Kinesis streaming connector with deep integration with Flink's > checkpointing mechanics > -- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] >
[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050 ] Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 1:56 PM: - I'm currently in-progress of the work, should be able to open a pull request within a week. was (Author: tzulitai): I'm currently in-progress of the work, shoulde be able to open a pull request within a week. > Add AWS Kinesis streaming connector with deep integration with Flink's > checkpointing mechanics > -- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] >
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091050#comment-15091050 ] Tzu-Li (Gordon) Tai commented on FLINK-3211: I'm currently in-progress of the work, will probably be able to open a pull request within a week. > Add AWS Kinesis streaming connector with deep integration with Flink's > checkpointing mechanics > -- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector with deep integration with Flink's checkpointing mechanics
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090974#comment-15090974 ] Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 1/10/16 10:42 AM: -- Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees: After a closer at KCL's documentation and its source code, I realized that KCL's own design of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector as a whole: each instance of a KCL application is a "worker", which consumes records from multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles which shards are assigned to which workers, and can do this dynamically in a sense that when new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime. Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as the instance pool of a single KCL application. The implementation will simply be to instantiate and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment after each consumer task starts running. Unfortunately, there is a few shortcomings if we use KCL: 1. The method to access the assigned shards to a Worker instance has private access. 2. The logic of where a Worker instance continues reading from a shard after restore is deep in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash table"). These two problems eventually led me to the conclusion that it isn't possible for KCL to work with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for hacky ways to bypass the above issues, but have come short to figure out any solutions. Therefore, AWS SDK is the way to go. Although I will have to do quite a bit of work that the KCL has already covered, in light of that we can achieve the only built-in, exactly-once guaranteeing processing engine for Kinesis currently available on the table, I think it is worth the effort. Tasks that we will have to work on includes: Task 1: Shard-to-task assignment for parallel data consumption. Task 2: Assigning multiple streams to the consumer (this is actually still a wish-list feature for KCL [1]) Task 3: Handling Kinesis-side resharding when Kinesis shards are merged or split. Task 4: If in the future Flink can support dynamic reconfiguration of task parallelism (maybe for higher throughput), will also need to handle Flink-side parallelism change. I propose that the scope of this JIRA focuses only on tasks 1 and 2. Task 3 includes a lot of extensive work which isn't necessary for Kinesis to work properly with Flink, and is out of the scope of this ticket. Task 4 is blocked by the availability of dynamic task parallelism reconfiguration at runtime. Ref: [1] https://github.com/awslabs/amazon-kinesis-client-nodejs/issues/4 was (Author: tzulitai): Re-clarification of why using the KCL is not viable if we want a deep integration of Kinesis with Flink's checkpointing & distributed snapshot mechanics for exactly-once guarantees: After a closer at KCL's documentation and its source code, I realized that KCL's own design of parallel consumption of Kinesis data works pretty much like our Flink's Kafka connector as a whole: each instance of a KCL application is a "worker", which consumes records from multiple Kinesis "shards (partitions)" that are assigned to it in parallel threads. KCL handles which shards are assigned to which workers, and can do this dynamically in a sense that when new worker instances are discovered, shard-to-worker assignments can be reassigned at runtime. Therefore, it seems reasonable to view all Flink consumer tasks of the Kinesis connector as the instance pool of a single KCL application. The implementation will simply be to instantiate and run a KCL worker on each consumer task. KCL can handle the shard-to-worker assignment after each consumer task starts running. Unfortunately, there is a few shortcomings if we use KCL: 1. The method to access the assigned shards to a Worker instance has private access. 2. The logic of where a Worker instance continues reading from a shard after restore is deep in the KCL code and can not be externally configured. It is hardcoded to read from the KCL-managed checkpoint table (the DynamoDB "leash table"). These two problems eventually led me to the conclusion that it isn't possible for KCL to work with Flink's checkpoint and restore mechanics. I've been looking around KCL source code for hacky ways to bypass the above issues, but have come short to figure out any solutions. Therefore, AWS SDK is the way to
[jira] [Commented] (FLINK-3093) Introduce annotations for interface stability
[ https://issues.apache.org/jira/browse/FLINK-3093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15090971#comment-15090971 ] ASF GitHub Bot commented on FLINK-3093: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1428#issuecomment-170333018 Thanks for the update! I'd like to do another pass over the DataSet Java and Scala classes next week. > Introduce annotations for interface stability > - > > Key: FLINK-3093 > URL: https://issues.apache.org/jira/browse/FLINK-3093 > Project: Flink > Issue Type: New Feature > Components: Build System >Affects Versions: 1.0.0 >Reporter: Robert Metzger >Assignee: Robert Metzger >Priority: Blocker > > For the upcoming 1.0 release, we want to mark interfaces as public/stable so > that we can automatically ensure that newer Flink releases (1.1, 1.2, ..) are > not breaking them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3093] Introduce annotations for interfa...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1428#issuecomment-170333018 Thanks for the update! I'd like to do another pass over the DataSet Java and Scala classes next week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3189) Error while parsing job arguments passed by CLI
[ https://issues.apache.org/jira/browse/FLINK-3189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091100#comment-15091100 ] ASF GitHub Bot commented on FLINK-3189: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1493#issuecomment-170362531 Done. Failing Travis is unrelated. > Error while parsing job arguments passed by CLI > --- > > Key: FLINK-3189 > URL: https://issues.apache.org/jira/browse/FLINK-3189 > Project: Flink > Issue Type: Bug > Components: Command-line client >Affects Versions: 0.10.1 >Reporter: Filip Leczycki >Assignee: Matthias J. Sax >Priority: Minor > > Flink CLI treats job arguments provided in format "-" as its own > parameters, which results in errors in execution. > Example 1: > call: >bin/flink info myJarFile.jar -f flink -i -m 1 > error: Unrecognized option: -f > Example 2: > Job myJarFile.jar is uploaded to web submission client, flink parameter box > is empty > program arguments box: -f flink -i -m 1 > error: > An unexpected error occurred: > Unrecognized option: -f > org.apache.flink.client.cli.CliArgsException: Unrecognized option: -f > at > org.apache.flink.client.cli.CliFrontendParser.parseInfoCommand(CliFrontendParser.java:296) > at org.apache.flink.client.CliFrontend.info(CliFrontend.java:376) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:983) > at > org.apache.flink.client.web.JobSubmissionServlet.doGet(JobSubmissionServlet.java:171) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:847) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:532) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:453) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:227) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:965) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:388) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:187) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:901) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:117) > at > org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:47) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:113) > at org.eclipse.jetty.server.Server.handle(Server.java:348) > at > org.eclipse.jetty.server.HttpConnection.handleRequest(HttpConnection.java:596) > at > org.eclipse.jetty.server.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:1048) > at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:549) > at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:211) > at > org.eclipse.jetty.server.HttpConnection.handle(HttpConnection.java:425) > at > org.eclipse.jetty.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:489) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:436) > at java.lang.Thread.run(Thread.java:745) > Execution of > >bin/flink run myJarFile.jar -f flink -i -m 1 > works perfectly fine -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3189] Error while parsing job arguments...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1493#issuecomment-170362531 Done. Failing Travis is unrelated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3212) JobManagerCheckpointRecoveryITCase
Matthias J. Sax created FLINK-3212: -- Summary: JobManagerCheckpointRecoveryITCase Key: FLINK-3212 URL: https://issues.apache.org/jira/browse/FLINK-3212 Project: Flink Issue Type: Bug Components: Tests Reporter: Matthias J. Sax Priority: Critical {noformat} Tests in error: JobManagerCheckpointRecoveryITCase.testCheckpointRecoveryFailure:354 » IllegalState JobManagerCheckpointRecoveryITCase.testCheckpointedStreamingSumProgram:192 » IO {noformat} https://travis-ci.org/mjsax/flink/jobs/101407273 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3109) Join two streams with two different buffer time
[ https://issues.apache.org/jira/browse/FLINK-3109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091162#comment-15091162 ] Stephan Ewen commented on FLINK-3109: - [~yangjun.wan...@gmail.com] How is your experience with that implementation? Do you think it is in a state that it could be contributed? > Join two streams with two different buffer time > --- > > Key: FLINK-3109 > URL: https://issues.apache.org/jira/browse/FLINK-3109 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.1 >Reporter: Wang Yangjun > Labels: easyfix, patch > Fix For: 0.10.2 > > Original Estimate: 48h > Remaining Estimate: 48h > > Current Flink streaming only supports join two streams on the same window. > How to solve this problem? > For example, there are two streams. One is advertisements showed to users. > The tuple in which could be described as (id, showed timestamp). The other > one is click stream -- (id, clicked timestamp). We want get a joined stream, > which includes all the advertisement that is clicked by user in 20 minutes > after showed. > It is possible that after an advertisement is shown, some user click it > immediately. It is possible that "click" message arrives server earlier than > "show" message because of Internet delay. We assume that the maximum delay is > one minute. > Then the need is that we should alway keep a buffer(20 mins) of "show" stream > and another buffer(1 min) of "click" stream. > It would be grate that there is such an API like. > showStream.join(clickStream) > .where(keySelector) > .buffer(Time.of(20, TimeUnit.MINUTES)) > .equalTo(keySelector) > .buffer(Time.of(1, TimeUnit.MINUTES)) > .apply(JoinFunction) > http://stackoverflow.com/questions/33849462/how-to-avoid-repeated-tuples-in-flink-slide-window-join/34024149#34024149 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3213) Union of two streams with different parallelism should adjust parallelism automatically
[ https://issues.apache.org/jira/browse/FLINK-3213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Suneel Marthi reassigned FLINK-3213: Assignee: Suneel Marthi > Union of two streams with different parallelism should adjust parallelism > automatically > --- > > Key: FLINK-3213 > URL: https://issues.apache.org/jira/browse/FLINK-3213 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.10.1 >Reporter: Stephan Ewen >Assignee: Suneel Marthi > Fix For: 1.0.0 > > > Currently, attempting to take the union of two streams with different > parallelism does not work, the operation is rejected. One needs to manually > rebalance() to adjust the parallelism. > The rebalance() should be inserted automatically. > Here is an example that shows how to reproduce this: > https://gist.github.com/smarthi/f49f387008f333c0c434 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3213) Union of two streams with different parallelism should adjust parallelism automatically
Stephan Ewen created FLINK-3213: --- Summary: Union of two streams with different parallelism should adjust parallelism automatically Key: FLINK-3213 URL: https://issues.apache.org/jira/browse/FLINK-3213 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10.1 Reporter: Stephan Ewen Fix For: 1.0.0 Currently, attempting to take the union of two streams with different parallelism does not work, the operation is rejected. One needs to manually rebalance() to adjust the parallelism. The rebalance() should be inserted automatically. Here is an example that shows how to reproduce this: https://gist.github.com/smarthi/f49f387008f333c0c434 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091159#comment-15091159 ] Stephan Ewen commented on FLINK-3211: - Awesome news. A Kinesis connector would be super nice indeed! Looking forward to the pull request! > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Storm-Compatibility] Forward Storm Kryo regis...
GitHub user mjsax opened a pull request: https://github.com/apache/flink/pull/1495 [Storm-Compatibility] Forward Storm Kryo registrations to Flink You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/flink storm-kryo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1495 commit 4b4d2b8592d2a26c76cdf552a1faddf780d4e75d Author: mjsaxDate: 2016-01-10T15:59:37Z [Storm-Compatibility] Forward Storm Kryo registrations to Flink --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15091295#comment-15091295 ] Stephan Ewen commented on FLINK-3211: - Concerning your questions: I think going with the low-level SDK seems reasonable. In Kafka/Flink, Committing offsets back is mostly for outside tool to see where in the stream the consumer is, plus that a new program does not always start from the beginning. It is probably fine to not worry about that for now. It would be good if the implementation offered to let the user decide where in the stream to start reading: - latest record sequence number (default) - earliest record sequence number - a specific sequence number. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4]