[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367080#comment-15367080
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3231:


:) No problem, thank you for merging!

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362978#comment-15362978
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2131


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362845#comment-15362845
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
I'm going to merge this once travis gives me green light ;)


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359165#comment-15359165
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
Thank you for the quick fix. I hope I can take a look tonight. Otherwise, 
I'll look at it early next week.
Thanks a lot for addressing my comments to quickly.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359105#comment-15359105
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger Thank you for your review. I hope I've addressed your last 
comments with the last commit.

For the documentation, I added a bit more apart from the threading model: 
1) enabling checkpointing, mostly borrowed from the Kafka documentation, and 2) 
information on how the consumer internally uses the Kinesis APIs so that users 
can make sense of any limit warnings they are getting in the logs.

Please let me know if there's anything else to address!


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358982#comment-15358982
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69300764
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358979#comment-15358979
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69300586
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358977#comment-15358977
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69300348
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358969#comment-15358969
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69299251
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358968#comment-15358968
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69299145
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358962#comment-15358962
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69298876
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -60,10 +60,10 @@ to setup Kinesis streams. Make sure to create the 
appropriate IAM policy and use
 
 ### Kinesis Consumer
--- End diff --

Good point! I'll add this.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358927#comment-15358927
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
I tested the code, also with a shard-merging:
`aws kinesis merge-shards  --shard-to-merge shardId-0001 
--adjacent-shard-to-merge shardId-0002 --stream-name flink-test`

and everything worked nicely, the log statements were good.
If you want to improve it a little bit, we should maybe log at debug level 
each time we discover new shards (just to show that everything is working as 
expected).

The only thing missing are some minor documents, then, I think we are good 
to merge.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358925#comment-15358925
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69296319
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358818#comment-15358818
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69285572
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358714#comment-15358714
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69274851
  
--- Diff: docs/apis/streaming/connectors/kinesis.md ---
@@ -60,10 +60,10 @@ to setup Kinesis streams. Make sure to create the 
appropriate IAM policy and use
 
 ### Kinesis Consumer
--- End diff --

I think we should also mention the threading model of the Kinesis consumer 
in the documentation.

Users should know that each parallel Flink instance will constantly query 
Kinesis for shards. So if a user has 5 shards, but runs Flink with a 
parallelism of 50, there will be 50 threads constantly querying Kinesis.

Also, we should explain that there will always be one thread per shard 
running.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358708#comment-15358708
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69274234
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -68,10 +97,30 @@
//  Default configuration values
// 

 
-   public static final int DEFAULT_STREAM_DESCRIBE_RETRY_TIMES = 3;
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+   public static final double 
DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
+
+   public static final double 
DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
 
-   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF = 1000L;
+   public static final double 
DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-   public static final int DEFAULT_SHARD_RECORDS_PER_GET = 100;
+   public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 
1L;
--- End diff --

Okay. 10 seconds is okay as well.



> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15357072#comment-15357072
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69130874
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java
 ---
@@ -68,10 +97,30 @@
//  Default configuration values
// 

 
-   public static final int DEFAULT_STREAM_DESCRIBE_RETRY_TIMES = 3;
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;
+
+   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;
+
+   public static final double 
DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100;
+
+   public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L;
+
+   public static final double 
DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
+
+   public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L;
+
+   public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L;
 
-   public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF = 1000L;
+   public static final double 
DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;
 
-   public static final int DEFAULT_SHARD_RECORDS_PER_GET = 100;
+   public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 
1L;
--- End diff --

I'm using 10s for default discovery interval here. I tested it and the 
originally suggested 30s seemed a bit too long as a default, IMHO. Can change 
it back to 30s if you think it's more appropriate.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15357067#comment-15357067
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r69129641
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,157 +17,553 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
 import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher accomplishes the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) last seen shard ids of each 
subscribed stream (used for continuous shard discovery),
+ * and 2) last processed sequence numbers of each subscribed shard. Since 
operations on the second state will be performed
+ * by multiple threads, these operations should only be done using the 
handler methods provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, {@link 
ShardConsumer}s using this must
+   

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356909#comment-15356909
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger
Just pushed the changes to address comments + rebase on the exactly-once 
fix / user-agent fix.
Sorry, had some trouble with the rebasing and took more time than I 
expected.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355320#comment-15355320
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger Thanks Robert!
Sorry for the delay, got caught up with other things. Will push within the 
next 2~3 hrs.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355054#comment-15355054
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
Okay, I'll merge the exactly once fix now.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355053#comment-15355053
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68930725
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

Yes.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15355039#comment-15355039
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
I'll rebase this after #2180 is merged to reflect in this PR the 
exactly-once fix, and commit altogether with the changes to address the 
remaining comments.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352769#comment-15352769
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68733201
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

@rmetzger double check: are we still adding configurable frequency with 30 
second default for this?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352751#comment-15352751
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger
Thanks a lot for your helpful review :) I should be able to address your 
comments before tomorrow so we can start testing it.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352738#comment-15352738
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68731349
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -155,55 +199,79 @@ public GetRecordsResult getRecords(String 
shardIterator, int maxRecordsToGet) {
 * @param startingSeqNum the sequence number that the iterator will 
start from
 * @return the shard iterator
 */
+   @Override
public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
-   return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator();
+   return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, 
startingSeqNum).getShardIterator();
--- End diff --

Okay. Actually, the 5 rps limit for `getShardIterator()` is per shard. 
We'll only have one thread making this call for each shard, so the rate limit 
won't be a problem for us. But as explained above, we should still retry + fail 
hard to let the user know if they need to adjust other non-Flink consumers.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352733#comment-15352733
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68730716
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,489 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352705#comment-15352705
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68729145
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -155,55 +199,79 @@ public GetRecordsResult getRecords(String 
shardIterator, int maxRecordsToGet) {
 * @param startingSeqNum the sequence number that the iterator will 
start from
 * @return the shard iterator
 */
+   @Override
public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
-   return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator();
+   return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, 
startingSeqNum).getShardIterator();
--- End diff --

Yes, I think we should have retries there as well. In particular during 
start up, many parallel Flink instances will query AWS for the shard iterator.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352692#comment-15352692
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68727843
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -86,32 +144,25 @@ public KinesisProxy(Properties configProps) {
 * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
 * @return the batch of retrieved records
 */
-   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   @Override
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
 
GetRecordsResult getRecordsResult = null;
 
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-   int i=0;
-   while (i <= remainingRetryTimes && getRecordsResult == null) {
+   int attempt = 0;
+   while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
+   long backoffMillis = fullJitterBackoff(
+   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException interruptEx) {
-   //
-   }
+   + backoffMillis + " millis.");
+   Thread.sleep(backoffMillis);
}
-   i++;
}
 
if (getRecordsResult == null) {
--- End diff --

Thank you for the good explanation. I agree to keep it as it is.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352676#comment-15352676
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68726797
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

I agree with
> Simply put, I don't think we can soley rely on the event of encountering 
a closed shard to let the new shards get fully picked up by the correct 
subtasks.

My idea was to trigger an additional discovery when we hit a closed shard.
We could for example discover shards every minute OR when we hit a closed 
shard. But you are right, this won't solve the issue with other parallel 
instances.
Lets keep it as is for now.
Thank you for the explanation.

Lets keep the assignment from `ShardDiscoverer#isShouldSubscribeTo()`. 


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352638#comment-15352638
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68722233
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,489 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352540#comment-15352540
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68711841
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -155,55 +199,79 @@ public GetRecordsResult getRecords(String 
shardIterator, int maxRecordsToGet) {
 * @param startingSeqNum the sequence number that the iterator will 
start from
 * @return the shard iterator
 */
+   @Override
public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, String startingSeqNum) {
-   return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator();
+   return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, 
startingSeqNum).getShardIterator();
--- End diff --

@rmetzger This method doesn't do the 
`ProvisionedThroughputExceededException` check / retries like the 
`getRecords()` method. Perhaps we should be checking that + fail hard after 
retrying too, as it also has a 5 requests per second limit? Reasoning is the 
same as my reply to your comment on the `getRecords()` method.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352370#comment-15352370
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68698386
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -86,32 +144,25 @@ public KinesisProxy(Properties configProps) {
 * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
 * @return the batch of retrieved records
 */
-   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   @Override
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
 
GetRecordsResult getRecordsResult = null;
 
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-   int i=0;
-   while (i <= remainingRetryTimes && getRecordsResult == null) {
+   int attempt = 0;
+   while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
+   long backoffMillis = fullJitterBackoff(
+   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException interruptEx) {
-   //
-   }
+   + backoffMillis + " millis.");
+   Thread.sleep(backoffMillis);
}
-   i++;
}
 
if (getRecordsResult == null) {
--- End diff --

My reasoning for failing hard here and not in the `describeStream()` 
operation for limit / provision throughput exceeded exceptions is that I think 
rate exceeding for `getRecords()` on a single shard may be cause by numerous 
reasons that the user should handle.
1. The user might have other non-Flink concurrent consumers reading the 
same shard (Kinesis limits to 5 concurrent reads per shard.) The user should 
either slow down the non-Flink consumers, or slow down our `ShardConsumer` by 
adjusting the `KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_IDLE_MILLIS` we 
will be introducing in https://github.com/apache/flink/pull/2071.
2. The size of each record is too large, and the user needs to adjust 
`KinesisConfigConstants.CONFIG_SHARD_GETRECORDS_MAX` to a smaller value.

`describeStream()` on the other hand is an internal operation we need to 
continuously do, and make sure we get results for the connector to work.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352343#comment-15352343
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68696712
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -86,32 +144,25 @@ public KinesisProxy(Properties configProps) {
 * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
 * @return the batch of retrieved records
 */
-   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   @Override
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
 
GetRecordsResult getRecordsResult = null;
 
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-   int i=0;
-   while (i <= remainingRetryTimes && getRecordsResult == null) {
+   int attempt = 0;
+   while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
+   long backoffMillis = fullJitterBackoff(
+   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException interruptEx) {
-   //
-   }
+   + backoffMillis + " millis.");
+   Thread.sleep(backoffMillis);
}
-   i++;
}
 
if (getRecordsResult == null) {
--- End diff --

Misunderstanding here.
The `getRecordsResult` will only be `null` here if we fail to get any 
response from Kinesis due to `ProvisionedThroughputExceededException` even 
after 3 retries (default setting of 
`KinesisConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES`). We'll always get a 
`GetRecordsResult` regardless of the status of the shard.

What the javadoc is saying is that if we reached the end of a shard (the 
shard is closed) `GetRecordsResult#getNextShardIterator()` will return a 
`null`, meaning that we won't be able to continue consuming the shard. In 
`ShardConsumer`, we always renew the `nextShardIterator` with the value 
returned in `GetRecordsResult`. If we get a `null`, we know the shard is closed 
and terminate the `ShardConsumer` thread. So, no more `getRecords()` calls will 
be made to the shard after it is closed.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352298#comment-15352298
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68694102
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

Some feedback for this approach:
At first I had thought of the solution you mentioned. The problem was that 
the new shards will not necessarily belong to the subtask that hit a closed 
shard (according to the new way we are assigning shards to subtasks in 
`ShardDiscoverer#isShouldSubscribeTo()`), so we still need other subtasks to 
continuously poll Kinesis in order for the right subtasks to pick up the new 
shards. Simply put, I don't think we can soley rely on the event of 
encountering a closed shard to let the new shards get fully picked up by the 
correct subtasks.

We could drop the assigning of `ShardDiscoverer#isShouldSubscribeTo()`, and 
simply let subtasks be responsible for new child shards of a closed shard it 
was previously consuming. However, with this approach we'll be subject to 
severe load imbalance, because in Kinesis, users can freely choose which shards 
to split and merge. To counter this we will a need rebalance mechanism, as well 
as a "merged snapshot" on restore on failure.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351210#comment-15351210
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68594061
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -86,32 +144,25 @@ public KinesisProxy(Properties configProps) {
 * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
 * @return the batch of retrieved records
 */
-   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   @Override
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
 
GetRecordsResult getRecordsResult = null;
 
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-   int i=0;
-   while (i <= remainingRetryTimes && getRecordsResult == null) {
+   int attempt = 0;
+   while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
+   long backoffMillis = fullJitterBackoff(
+   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException interruptEx) {
-   //
-   }
+   + backoffMillis + " millis.");
+   Thread.sleep(backoffMillis);
}
-   i++;
}
 
if (getRecordsResult == null) {
--- End diff --

Not sure if throwing an exception here is a good idea.
From the javadocs of `getRecord()` it says: "Note that if the shard has 
been closed, the shard iterator can't return more data and GetRecords 
returns null". This means that our code would fail each time some 
shards are closed?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351101#comment-15351101
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger 
No problem, thank you! I'll reply to your comments after you finish 
reviewing :)


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351093#comment-15351093
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
@tzulitai: I'm still not done with the review. I hope I can get it done in 
the next two hours.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351089#comment-15351089
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68583275
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,489 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351076#comment-15351076
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68582645
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ShardDiscoverer.class);
+
+   /** This fetcher reference is used to add discovered shards to the 
pending shards queue */
+   private final KinesisDataFetcher fetcherRef;
+
+   /** Kinesis proxy to retrieve shard lists from Kinesis */
+   private final KinesisProxyInterface kinesis;
+
+   /**
+* The last seen shard of each stream. Since new Kinesis shards are 
always created in ascending ids (regardless of
+* whether the new shard was a result of a shard split or merge), this 
state can be used when calling
+* {@link KinesisProxyInterface#getShardList(Map)} to ignore shards we 
have already discovered before.
+*/
+   private final Map streamToLastSeenShard;
+
+   private final int totalNumberOfConsumerSubtasks;
+   private final int indexOfThisConsumerSubtask;
+
+   /**
+* Create a new shard discoverer.
+*
+* @param fetcherRef reference to the owning fetcher
+*/
+   public ShardDiscoverer(KinesisDataFetcher fetcherRef) {
+   this(fetcherRef, 
KinesisProxy.create(fetcherRef.getConsumerConfiguration()), new HashMap());
+   }
+
+   /** This constructor is exposed for testing purposes */
+   protected ShardDiscoverer(KinesisDataFetcher fetcherRef,
+   KinesisProxyInterface 
kinesis,
+   Map 
streamToLastSeenShard) {
+   this.fetcherRef = checkNotNull(fetcherRef);
+   this.kinesis = checkNotNull(kinesis);
+   this.streamToLastSeenShard = 
checkNotNull(streamToLastSeenShard);
+
+   this.totalNumberOfConsumerSubtasks = 
fetcherRef.getSubtaskRuntimeContext().getNumberOfParallelSubtasks();
+   this.indexOfThisConsumerSubtask = 
fetcherRef.getSubtaskRuntimeContext().getIndexOfThisSubtask();
+
  

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351077#comment-15351077
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68582684
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.Map;
+
+public interface KinesisProxyInterface {
--- End diff --

Sure, no problem.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351071#comment-15351071
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68582306
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

Okay. I'm still not done with the review, so my statement might not make 
much sense:
We could also trigger a special shard discovery when a reading thread hits 
a deleted / unavailable shard. This way, we can reduce the delay for our users.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351069#comment-15351069
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68582164
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,489 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351027#comment-15351027
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68578735
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

You are right, this thread continuously polls Kinesis. I was worried about 
the delay it would introduce, and how users would take the delay with it being 
a streaming source.
But yes I agree it will be best to introduce configurable frequency, I was 
initially a bit unsure of the continuous polling myself. We can add description 
about this delay of discovering new shards after resharding in the consumer 
documentation.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351030#comment-15351030
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68579220
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,489 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350888#comment-15350888
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68564678
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

It seems that this thread is continuously querying the Kinesis APIs. The 
only way to slow these requests down is by the jittered backoff.
I think we should query the kinesis api for reshards at a configurable 
frequency, probably by default only every 30 seconds or so.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350875#comment-15350875
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68564082
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,489 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350850#comment-15350850
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68562955
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,489 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continuously poll Kinesis to discover shards that the 
subtask should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350815#comment-15350815
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68560835
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ShardDiscoverer.class);
+
+   /** This fetcher reference is used to add discovered shards to the 
pending shards queue */
+   private final KinesisDataFetcher fetcherRef;
+
+   /** Kinesis proxy to retrieve shard lists from Kinesis */
+   private final KinesisProxyInterface kinesis;
+
+   /**
+* The last seen shard of each stream. Since new Kinesis shards are 
always created in ascending ids (regardless of
+* whether the new shard was a result of a shard split or merge), this 
state can be used when calling
+* {@link KinesisProxyInterface#getShardList(Map)} to ignore shards we 
have already discovered before.
+*/
+   private final Map streamToLastSeenShard;
+
+   private final int totalNumberOfConsumerSubtasks;
+   private final int indexOfThisConsumerSubtask;
+
+   /**
+* Create a new shard discoverer.
+*
+* @param fetcherRef reference to the owning fetcher
+*/
+   public ShardDiscoverer(KinesisDataFetcher fetcherRef) {
+   this(fetcherRef, 
KinesisProxy.create(fetcherRef.getConsumerConfiguration()), new HashMap());
+   }
+
+   /** This constructor is exposed for testing purposes */
+   protected ShardDiscoverer(KinesisDataFetcher fetcherRef,
+   KinesisProxyInterface 
kinesis,
+   Map 
streamToLastSeenShard) {
+   this.fetcherRef = checkNotNull(fetcherRef);
+   this.kinesis = checkNotNull(kinesis);
+   this.streamToLastSeenShard = 
checkNotNull(streamToLastSeenShard);
+
+   this.totalNumberOfConsumerSubtasks = 
fetcherRef.getSubtaskRuntimeContext().getNumberOfParallelSubtasks();
+   this.indexOfThisConsumerSubtask = 
fetcherRef.getSubtaskRuntimeContext().getIndexOfThisSubtask();
+
  

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350793#comment-15350793
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68558779
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyInterface.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.Map;
+
+public interface KinesisProxyInterface {
--- End diff --

Can you add a little bit of javadocs to the class and the methods?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15349496#comment-15349496
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
I've also tested the last commit on the unit & manual tests.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15349494#comment-15349494
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
Hi @rmetzger ,
I've addressed your initial comments with the last commit. No urgent hurry 
on the remaining review, please take your time :) Thanks!


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348522#comment-15348522
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
Okay, sounds good.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348516#comment-15348516
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger Thank you! No problem, I can wait and address the current 
comments meanwhile.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348503#comment-15348503
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
I'm not done with the review yet. I hope I find some time over the weekend 
to continue with it.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348419#comment-15348419
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68414043
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -86,32 +141,28 @@ public KinesisProxy(Properties configProps) {
 * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
 * @return the batch of retrieved records
 */
-   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
 
GetRecordsResult getRecordsResult = null;
 
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-   int i=0;
-   while (i <= remainingRetryTimes && getRecordsResult == null) {
+   Random seed = null;
+   int attempt = 0;
+   while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
-   LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException interruptEx) {
-   //
+   if (seed == null) {
+   seed = new Random();
--- End diff --

Nope. I'll change to create it as a static class member. `getRecords` and 
`describeStream` can actually share the random when calling `fullJitterBackoff`.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348412#comment-15348412
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68413466
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -206,4 +287,9 @@ private DescribeStreamResult describeStream(String 
streamName, String startShard
throw new RuntimeException("Stream is not Active or 
Updating");
--- End diff --

I agree! We shouldn't fail hard here.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348404#comment-15348404
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68412025
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -206,4 +287,9 @@ private DescribeStreamResult describeStream(String 
streamName, String startShard
throw new RuntimeException("Stream is not Active or 
Updating");
--- End diff --

Actually, should we really be throwing a RuntimeException if a single 
stream is not in ACTIVE or UPDATING status? Say the consumer is to fetch 3 
streams, and only 1 is found to be CREATING / DELETING. Perhaps we should treat 
this case the same as "can only find shards for some of the streams", and a 
warning log here will do?

There's a check that stops the consumer in the new `ShardDiscoverer` if no 
shards can be initially found at all to read from anyway.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348395#comment-15348395
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68411242
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -61,19 +68,23 @@ public String getShardId() {
}
 
public String getStartingSequenceNumber() {
-   return 
shard.getSequenceNumberRange().getStartingSequenceNumber();
+   SequenceNumberRange sequenceNumberRange = 
shard.getSequenceNumberRange();
+   return (sequenceNumberRange == null) ? null : 
sequenceNumberRange.getStartingSequenceNumber();
--- End diff --

Okay, I agree!


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348389#comment-15348389
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68410216
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -61,19 +68,23 @@ public String getShardId() {
}
 
public String getStartingSequenceNumber() {
-   return 
shard.getSequenceNumberRange().getStartingSequenceNumber();
+   SequenceNumberRange sequenceNumberRange = 
shard.getSequenceNumberRange();
+   return (sequenceNumberRange == null) ? null : 
sequenceNumberRange.getStartingSequenceNumber();
--- End diff --

If this is the case, than I don't see the reason for these checks.
In general, I have the feeling that the `KinesisStreamShard` is overly 
complicated. Maybe its easier to just have a method getShard() instead of 
proxying everything?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348386#comment-15348386
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68409807
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -61,19 +68,23 @@ public String getShardId() {
}
 
public String getStartingSequenceNumber() {
-   return 
shard.getSequenceNumberRange().getStartingSequenceNumber();
+   SequenceNumberRange sequenceNumberRange = 
shard.getSequenceNumberRange();
+   return (sequenceNumberRange == null) ? null : 
sequenceNumberRange.getStartingSequenceNumber();
--- End diff --

I added these null checks because in the tests, fake `KinesisStreamShards` 
may be created without giving them a SequenceNumberRange / HashNumberRange etc. 
if it wasn't relevant to the test. Perhaps I should revert these null check 
changes, and always give all values for the fake `KinesisStreamShard`s?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348387#comment-15348387
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68409932
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -61,19 +68,23 @@ public String getShardId() {
}
 
public String getStartingSequenceNumber() {
-   return 
shard.getSequenceNumberRange().getStartingSequenceNumber();
+   SequenceNumberRange sequenceNumberRange = 
shard.getSequenceNumberRange();
+   return (sequenceNumberRange == null) ? null : 
sequenceNumberRange.getStartingSequenceNumber();
--- End diff --

The `Shard`s returned from actual Kinesis will never have null values for 
these.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348383#comment-15348383
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68409331
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -159,42 +217,65 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator();
}
 
+   private List getShardsOfStream(String streamName, 
String lastSeenShardId) throws InterruptedException {
+   List shardsOfStream = new ArrayList<>();
+
+   DescribeStreamResult describeStreamResult;
+   do {
+   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
+
+   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   for (Shard shard : shards) {
+   shardsOfStream.add(new 
KinesisStreamShard(streamName, shard));
+   }
+
+   if (shards.size() != 0) {
+   lastSeenShardId = shards.get(shards.size() - 
1).getShardId();
+   }
+   } while 
(describeStreamResult.getStreamDescription().isHasMoreShards());
+
+   return shardsOfStream;
+   }
+
/**
 * Get metainfo for a Kinesis stream, which contains information about 
which shards this Kinesis stream possess.
 *
+* This method is using a "full jitter" approach described in
+* http://google.com;>https://www.awsarchitectureblog.com/2015/03/backoff.html.
 This is necessary
+* because concurrent calls will be made by all parallel subtask's 
{@link ShardDiscoverer}s. This jitter backoff
+* approach will help distribute calls across the discoverers over time.
+*
 * @param streamName the stream to describe
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, String 
startShardId) {
+   private DescribeStreamResult describeStream(String streamName, String 
startShardId) throws InterruptedException {
final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setExclusiveStartShardId(startShardId);
 
DescribeStreamResult describeStreamResult = null;
String streamStatus = null;
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
 
-   // Call DescribeStream, with backoff and retries (if we get 
LimitExceededException).
-   while ((remainingRetryTimes >= 0) && (describeStreamResult == 
null)) {
+   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   Random seed = null;
+   int attemptCount = 0;
+   while (describeStreamResult == null) { // retry until we get a 
result
try {
describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
} catch (LimitExceededException le) {
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException ie) {
-   LOG.debug("Stream " + streamName + " : 
Sleep  was interrupted ", ie);
+   if (seed == null) {
+

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348376#comment-15348376
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68408986
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,481 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continously poll Kinesis to discover shards that the subtask 
should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348350#comment-15348350
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68406625
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/KinesisStreamShard.java
 ---
@@ -61,19 +68,23 @@ public String getShardId() {
}
 
public String getStartingSequenceNumber() {
-   return 
shard.getSequenceNumberRange().getStartingSequenceNumber();
+   SequenceNumberRange sequenceNumberRange = 
shard.getSequenceNumberRange();
+   return (sequenceNumberRange == null) ? null : 
sequenceNumberRange.getStartingSequenceNumber();
--- End diff --

In what cases is the sequence range null?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348327#comment-15348327
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68404148
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -159,42 +217,65 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator();
}
 
+   private List getShardsOfStream(String streamName, 
String lastSeenShardId) throws InterruptedException {
+   List shardsOfStream = new ArrayList<>();
+
+   DescribeStreamResult describeStreamResult;
+   do {
+   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
+
+   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   for (Shard shard : shards) {
+   shardsOfStream.add(new 
KinesisStreamShard(streamName, shard));
+   }
+
+   if (shards.size() != 0) {
+   lastSeenShardId = shards.get(shards.size() - 
1).getShardId();
+   }
+   } while 
(describeStreamResult.getStreamDescription().isHasMoreShards());
+
+   return shardsOfStream;
+   }
+
/**
 * Get metainfo for a Kinesis stream, which contains information about 
which shards this Kinesis stream possess.
 *
+* This method is using a "full jitter" approach described in
+* http://google.com;>https://www.awsarchitectureblog.com/2015/03/backoff.html.
 This is necessary
+* because concurrent calls will be made by all parallel subtask's 
{@link ShardDiscoverer}s. This jitter backoff
+* approach will help distribute calls across the discoverers over time.
+*
 * @param streamName the stream to describe
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, String 
startShardId) {
+   private DescribeStreamResult describeStream(String streamName, String 
startShardId) throws InterruptedException {
final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setExclusiveStartShardId(startShardId);
 
DescribeStreamResult describeStreamResult = null;
String streamStatus = null;
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
 
-   // Call DescribeStream, with backoff and retries (if we get 
LimitExceededException).
-   while ((remainingRetryTimes >= 0) && (describeStreamResult == 
null)) {
+   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   Random seed = null;
+   int attemptCount = 0;
+   while (describeStreamResult == null) { // retry until we get a 
result
try {
describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
} catch (LimitExceededException le) {
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException ie) {
-   LOG.debug("Stream " + streamName + " : 
Sleep  was interrupted ", ie);
+   if (seed == null) {
+

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348323#comment-15348323
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68403918
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -206,4 +287,9 @@ private DescribeStreamResult describeStream(String 
streamName, String startShard
throw new RuntimeException("Stream is not Active or 
Updating");
--- End diff --

Can you put the actual stream status into the exception?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348318#comment-15348318
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68403594
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -86,32 +141,28 @@ public KinesisProxy(Properties configProps) {
 * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
 * @return the batch of retrieved records
 */
-   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
final GetRecordsRequest getRecordsRequest = new 
GetRecordsRequest();
getRecordsRequest.setShardIterator(shardIterator);
getRecordsRequest.setLimit(maxRecordsToGet);
 
GetRecordsResult getRecordsResult = null;
 
-   int remainingRetryTimes = Integer.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, 
Integer.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_RETRY_TIMES)));
-   long describeStreamBackoffTimeInMillis = Long.valueOf(
-   
configProps.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, 
Long.toString(KinesisConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF)));
-
-   int i=0;
-   while (i <= remainingRetryTimes && getRecordsResult == null) {
+   Random seed = null;
+   int attempt = 0;
+   while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
-   LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + describeStreamBackoffTimeInMillis + " 
millis.");
-   try {
-   
Thread.sleep(describeStreamBackoffTimeInMillis);
-   } catch (InterruptedException interruptEx) {
-   //
+   if (seed == null) {
+   seed = new Random();
--- End diff --

Is there a reason for creating a new RNG with every `getRecords` call?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348310#comment-15348310
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68402254
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -159,42 +217,65 @@ public String getShardIterator(KinesisStreamShard 
shard, String shardIteratorTyp
return kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShardId(), shardIteratorType, startingSeqNum).getShardIterator();
}
 
+   private List getShardsOfStream(String streamName, 
String lastSeenShardId) throws InterruptedException {
+   List shardsOfStream = new ArrayList<>();
+
+   DescribeStreamResult describeStreamResult;
+   do {
+   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
+
+   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   for (Shard shard : shards) {
+   shardsOfStream.add(new 
KinesisStreamShard(streamName, shard));
+   }
+
+   if (shards.size() != 0) {
+   lastSeenShardId = shards.get(shards.size() - 
1).getShardId();
+   }
+   } while 
(describeStreamResult.getStreamDescription().isHasMoreShards());
+
+   return shardsOfStream;
+   }
+
/**
 * Get metainfo for a Kinesis stream, which contains information about 
which shards this Kinesis stream possess.
 *
+* This method is using a "full jitter" approach described in
+* http://google.com;>https://www.awsarchitectureblog.com/2015/03/backoff.html.
 This is necessary
--- End diff --

Why is this linking to google.com?


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348295#comment-15348295
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68400657
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -122,29 +173,36 @@ public GetRecordsResult getRecords(String 
shardIterator, int maxRecordsToGet) {
}
 
/**
-* Get the list of shards associated with multiple Kinesis streams
+* Get the complete shard list of multiple Kinesis streams.
 *
-* @param streamNames the list of Kinesis streams
-* @return a list of {@link KinesisStreamShard}s
+* @param streamNames Kinesis streams to retrieve the shard list for
+* @return shard list result
 */
-   public List getShardList(List streamNames) {
-   List shardList = new ArrayList<>();
+   public GetShardListResult getShardList(List streamNames) throws 
InterruptedException {
--- End diff --

We can probably drop this unused method.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348293#comment-15348293
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68400596
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/IKinesisProxy.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IKinesisProxy {
+   GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) 
throws InterruptedException;
+   GetShardListResult getShardList(List streamNames) throws 
InterruptedException;
--- End diff --

This method is unused.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348280#comment-15348280
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68399454
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -122,29 +173,36 @@ public GetRecordsResult getRecords(String 
shardIterator, int maxRecordsToGet) {
}
 
/**
-* Get the list of shards associated with multiple Kinesis streams
+* Get the complete shard list of multiple Kinesis streams.
 *
-* @param streamNames the list of Kinesis streams
-* @return a list of {@link KinesisStreamShard}s
+* @param streamNames Kinesis streams to retrieve the shard list for
+* @return shard list result
 */
-   public List getShardList(List streamNames) {
-   List shardList = new ArrayList<>();
+   public GetShardListResult getShardList(List streamNames) throws 
InterruptedException {
--- End diff --

Missing `@Override` annotation


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348281#comment-15348281
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68399467
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -122,29 +173,36 @@ public GetRecordsResult getRecords(String 
shardIterator, int maxRecordsToGet) {
}
 
/**
-* Get the list of shards associated with multiple Kinesis streams
+* Get the complete shard list of multiple Kinesis streams.
 *
-* @param streamNames the list of Kinesis streams
-* @return a list of {@link KinesisStreamShard}s
+* @param streamNames Kinesis streams to retrieve the shard list for
+* @return shard list result
 */
-   public List getShardList(List streamNames) {
-   List shardList = new ArrayList<>();
+   public GetShardListResult getShardList(List streamNames) throws 
InterruptedException {
+   GetShardListResult result = new GetShardListResult();
 
for (String stream : streamNames) {
-   DescribeStreamResult describeStreamResult;
-   String lastSeenShardId = null;
+   result.addRetrievedShardsToStream(stream, 
getShardsOfStream(stream, null));
+   }
+   return result;
+   }
 
-   do {
-   describeStreamResult = describeStream(stream, 
lastSeenShardId);
+   /**
+* Get shard list of multiple Kinesis streams, ignoring the
+* shards of each streambefore a specified last seen shard id.
+*
+* @param streamNamesWithLastSeenShardIds a map with stream as key, and 
last seen shard id as value
+* @return shard list result
+*/
+   public GetShardListResult getShardList(Map 
streamNamesWithLastSeenShardIds) throws InterruptedException {
--- End diff --

Missing `@Override` annotation


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348279#comment-15348279
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68399431
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -86,32 +141,28 @@ public KinesisProxy(Properties configProps) {
 * @param maxRecordsToGet the maximum amount of records to retrieve for 
this batch
 * @return the batch of retrieved records
 */
-   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) {
+   public GetRecordsResult getRecords(String shardIterator, int 
maxRecordsToGet) throws InterruptedException {
--- End diff --

Missing `@Override` annotation


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348274#comment-15348274
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68399389
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/IKinesisProxy.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IKinesisProxy {
--- End diff --

The `I` prefix is not commonly used within Flink.
I would suggest `KinesisProxyInterface` as a name here.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348255#comment-15348255
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68397478
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -17,156 +17,481 @@
 
 package org.apache.flink.streaming.connectors.kinesis.internals;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
-import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
 import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
+
+import java.util.LinkedList;
 import java.util.List;
-import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A Kinesis Data Fetcher that consumes data from a specific set of 
Kinesis shards.
- * The fetcher spawns a single thread for connection to each shard.
+ * A KinesisDataFetcher is responsible for fetching data from multiple 
Kinesis shards. Each parallel subtask instantiates
+ * and runs a single fetcher throughout the subtask's lifetime. The 
fetcher runs several threads to accomplish
+ * the following:
+ * 
+ * 1. continously poll Kinesis to discover shards that the subtask 
should subscribe to. The subscribed subset
+ *   of shards, including future new shards, is 
non-overlapping across subtasks (no two subtasks will be
+ *   subscribed to the same shard) and determinate across 
subtask restores (the subtask will always subscribe
+ *   to the same subset of shards even after 
restoring)
+ * 2. decide where in each discovered shard should the fetcher 
start subscribing to
+ * 3. subscribe to shards by creating a single thread for each 
shard
+ * 
+ *
+ * The fetcher manages two states: 1) pending shards for subscription, 
and 2) last processed sequence numbers of
+ * each subscribed shard. All operations on the states in multiple threads 
should only be done using the handler methods
+ * provided in this class.
  */
-public class KinesisDataFetcher {
+public class KinesisDataFetcher {
 
private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataFetcher.class);
 
-   /** Config properties for the Flink Kinesis Consumer */
+   // 

+   //  Consumer-wide settings
+   // 

+
+   /** Configuration properties for the Flink Kinesis Consumer */
private final Properties configProps;
 
-   /** The name of the consumer task that this fetcher was instantiated */
-   private final String taskName;
+   /** The list of Kinesis streams that the consumer is subscribing to */
+   private final List streams;
+
+   /**
+* The deserialization schema we will be using to convert Kinesis 
records to Flink objects.
+* Note that since this might not be thread-safe, multiple threads in 
the fetcher using this must
+* clone a copy using {@link 
KinesisDataFetcher#getClonedDeserializationSchema()}.
+*/
+   private final KinesisDeserializationSchema deserializationSchema;
+
+   // 

+   //  Subtask-specific settings
+   // 

+
+   /** Runtime context of the subtask that this fetcher was created in */
+   private final RuntimeContext runtimeContext;
+
+   // 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15348158#comment-15348158
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
I'll review your change now.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346830#comment-15346830
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
@rmetzger tagging just to make sure you're notified of this PR :)
When will you be free to review? Just for my own time allocation for when 
to continue working on the Kinesis connector. If there's anything majorly wrong 
with the implementation explained in the above comment, please let me know and 
I'll try to address them before effort on a detailed review.
Thanks in advance!


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15339810#comment-15339810
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
The ManualExactlyOnceTest and ManualExactlyOnceWithStreamReshardingTest is 
a bit buggy, fixing with a follow-up commit ...


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15338612#comment-15338612
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2131

[FLINK-3231][streaming-connectors] FlinkKinesisConsumer rework to handle 
Kinesis resharding

This change attempts to solve 2 issues:
1. [FLINK-3231](https://issues.apache.org/jira/browse/FLINK-3231): Handle 
Kinesis-side resharding.
2. [FLINK-4020](https://issues.apache.org/jira/browse/FLINK-4020): Remove 
shard list querying from Kinesis consumer constructor.

Some notes on the implementation:
- All subtasks has a thread that continuously polls for changes in the 
Kinesis stream, and uses exponential backoff with jitter to try to even out the 
concurrent Kinesis client describeStream operations across subtasks. 
Continuously polling is necessary because there's currently no way to "signal" 
a subtask that it has a new shard it should be subscribing to.
- A big change is that all subtasks run a fetcher that continues to poll 
for shards, even if the subtask initially didn't have shards to consume 
(before, a MAX_VALUE waterwark was sent out).
- Apart from the unit tests, I've manually tested this with 
`ManualExactlyOnceWithStreamReshardingTest`. However, since the 
`FlinkKinesisProducer` currently has some problem that records are resent when 
Kinesis streams are resharded (thus not allowing the exactly-once test to pass 
at all), this manual test uses a normal event generator instead of a producer 
topology running the `FlinkKinesisProducer`.

Since this PR introduces considerable rework on the Kinesis consumer, I'll 
wait until this is merged before submitting 
[FLINK-4080](https://issues.apache.org/jira/browse/FLINK-4080) & 
[FLINK-4019](https://issues.apache.org/jira/browse/FLINK-4019).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-3231

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2131


commit 378ec8177f1bfe91b459233a8ce02e9f988c61ab
Author: Gordon Tai 
Date:   2016-06-08T10:46:02Z

[FLINK-4020] Move shard list querying to open() for Kinesis consumer

commit 2c9f1304d5f6220fe36ad9d7833a506651f3fee6
Author: Gordon Tai 
Date:   2016-06-19T16:15:43Z

[FLINK-3231] FlinkKinesisConsumer rework to handle Kinesis resharding




> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> 

[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-09 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322524#comment-15322524
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3231:


Update for this:
Had a talk with [~rmetzger] about the implementation for this. We should be 
able to avoid inter- subtask coordination and any external state store by 
letting all subtasks poll for new shards that each subtask should be in charge 
of consuming, based on hash of stream name + shard id of each shard.

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-04 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315494#comment-15315494
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3231:


Hi Stephan,

Finally reached the point where we could start working on this. I'm currently 
in the designing phase for this feature.

Merged state on restore sounds interesting (is there currently any doc / thread 
/ JIRA that's issuing this merged states feature?).

However, I don't think it will be able to fully solve the described problem for 
this JIRA, unless we are expecting the streaming job to fail and restore every 
time resharding happens. We still need coordination between the subtasks to 
gracefully handle the resharding. 

On the other hand, if the merged state feature also provides access (including 
incomplete checkpoints) from subtasks during job execution, then it might be 
possible to figure out an implementation.

> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-01-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15101902#comment-15101902
 ] 

Stephan Ewen commented on FLINK-3231:
-

What we could start adding to Flink is a kind of state that is globally merged.

For example:
  - Source subtask 1 checkpoints state (shard1 - offset 56)
  - Source subtask 2 checkpoints state (shard2- offset 42)
  - Source subtask 3 checkpoints state (shard3 - offset 17)

The checkpoint coordinator makes one state out of that: [ (shard1 - offset 56) 
, (shard2 - offset 42) , (shard3 - offset 17) ].
On restore, all tasks get the full state.

Let's say we restore the job and the assignment changed such that source 
subtask 1 now gets shard1 and 2. It has all required offsets to start working 
from that union of shards and not introduce duplicates.



> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)