[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16071078#comment-16071078
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3746


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.4.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069669#comment-16069669
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
(please ignore the batch of commits. I'm just using this branch for an 
overall Travis run before committing the batch; personal accounts get time-outs 
too constantly)


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.4.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069437#comment-16069437
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
Travis passed, merging ...


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.4.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068277#comment-16068277
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
Thanks for the review @aljoscha. Will address the comment and merge after 
Travis gives green :)


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.4.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068273#comment-16068273
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r124785388
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -189,6 +192,8 @@
 * @param topics fixed list of topics to subscribe to (null, if using 
topic pattern)
 * @param topicPattern the topic pattern to subscribe to (null, if 
using fixed topics)
 * @param deserializer The deserializer to turn raw byte messages into 
Java/Scala objects.
+* @param discoveryIntervalMillis the topic / partition discovery 
interval, in
+*milliseconds (0 if discovery is 
disabled).
--- End diff --

Here it says `0` for disabled while in other places it's `Long.MIN_VALUE`.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.4.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068095#comment-16068095
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
Hi @aljoscha, I've rebased this PR, fixed the previous blocking issue 
(discovery enabling & restoring from round-robin list state), and added 
documentation for the limitation. Could you have another look? Thanks!


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.4.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015457#comment-16015457
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
Hi @haohui, I revisited this, but I think there is no easy workaround for 
this, unfortunately.
This can only be properly merged in 1.4 after some additional requirements 
are added to Flink.

However, if you know that during Flink 1.2 your FlinkKafkaConsumer was not 
rescaled before, then in that case specifically you should be able to use this 
patch. It would however require a custom build of the connector on your side.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013493#comment-16013493
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
@haohui I can not guarantee this, but I can try revisiting this for a 
workaround after I've done some testing on the current RC1.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013303#comment-16013303
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3746
  
@tzulitai we are interested in putting this feature in production soon. Do 
you think whether it is possible for us to get a workaround for 1.3? Thanks.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009195#comment-16009195
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
Hi @haohui, unfortunately we might not be able to merge this for 1.3. The 
reason is that the feature requires a migration from partitionable list state 
to union list state, which currently can not be properly done, as can be seen 
in Aljoscha's inline comment.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009191#comment-16009191
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r116352819
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -503,23 +644,30 @@ public void close() throws Exception {
public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
OperatorStateStore stateStore = context.getOperatorStateStore();
-   offsetsStateForCheckpoint = 
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
-   if (context.isRestored()) {
-   if (restoredState == null) {
-   restoredState = new HashMap<>();
-   for (Tuple2 
kafkaOffset : offsetsStateForCheckpoint.get()) {
-   restoredState.put(kafkaOffset.f0, 
kafkaOffset.f1);
-   }
+   ListState> 
oldRoundRobinListState =
+   
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
-   LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Using the following offsets: 
{}", restoredState);
-   }
+   this.unionOffsetStates = stateStore.getUnionListState(new 
ListStateDescriptor<>(
+   OFFSETS_STATE_NAME,
+   TypeInformation.of(new 
TypeHint>() {})));
+
+   if (context.isRestored() && !restoredFromOldState) {
+   restoredState = new TreeMap<>(new 
KafkaTopicPartition.Comparator());
+
+   // migrate from 1.2 state, if there is any
+   for (Tuple2 kafkaOffset : 
oldRoundRobinListState.get()) {
+   restoredFromOldState = true;
--- End diff --

You're right, this is definitely problematic. I think we'll have to block 
this feature until we have the required state information accessible before we 
can merge this then.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006678#comment-16006678
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r116000971
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -503,23 +644,30 @@ public void close() throws Exception {
public void initializeState(FunctionInitializationContext context) 
throws Exception {
 
OperatorStateStore stateStore = context.getOperatorStateStore();
-   offsetsStateForCheckpoint = 
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
-   if (context.isRestored()) {
-   if (restoredState == null) {
-   restoredState = new HashMap<>();
-   for (Tuple2 
kafkaOffset : offsetsStateForCheckpoint.get()) {
-   restoredState.put(kafkaOffset.f0, 
kafkaOffset.f1);
-   }
+   ListState> 
oldRoundRobinListState =
+   
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 
-   LOG.info("Setting restore state in the 
FlinkKafkaConsumer.");
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Using the following offsets: 
{}", restoredState);
-   }
+   this.unionOffsetStates = stateStore.getUnionListState(new 
ListStateDescriptor<>(
+   OFFSETS_STATE_NAME,
+   TypeInformation.of(new 
TypeHint>() {})));
+
+   if (context.isRestored() && !restoredFromOldState) {
+   restoredState = new TreeMap<>(new 
KafkaTopicPartition.Comparator());
+
+   // migrate from 1.2 state, if there is any
+   for (Tuple2 kafkaOffset : 
oldRoundRobinListState.get()) {
+   restoredFromOldState = true;
--- End diff --

Could it be that we restore from an old 1.2 snapshot and don't get anything 
here because we simply weren't assigned any state. (For example because the 
parallelism is higher than before.)


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006677#comment-16006677
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r115996970
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -106,31 +139,69 @@ protected AbstractFetcher(
}
}
 
-   // create our partition state according to the 
timestamp/watermark mode 
-   this.subscribedPartitionStates = 
initializeSubscribedPartitionStates(
-   assignedPartitionsWithInitialOffsets,
+   this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+   // initialize subscribed partition states with seed partitions
+   this.subscribedPartitionStates = createPartitionStateHolders(
+   seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
-   watermarksPeriodic, watermarksPunctuated,
+   watermarksPeriodic,
+   watermarksPunctuated,
userCodeClassLoader);
 
-   // check that all partition states have a defined offset
+   // check that all seed partition states have a defined offset
for (KafkaTopicPartitionState partitionState : 
subscribedPartitionStates) {
if (!partitionState.isOffsetDefined()) {
-   throw new IllegalArgumentException("The fetcher 
was assigned partitions with undefined initial offsets.");
+   throw new IllegalArgumentException("The fetcher 
was assigned seed partitions with undefined initial offsets.");
}
}
-   
+
+   // all seed partitions are not assigned yet, so should be added 
to the unassigned partitions queue
+   for (KafkaTopicPartitionState partition : 
subscribedPartitionStates) {
+   unassignedPartitionsQueue.add(partition);
+   }
+
// if we have periodic watermarks, kick off the interval 
scheduler
if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
-   KafkaTopicPartitionStateWithPeriodicWatermarks[] 
parts = 
-   
(KafkaTopicPartitionStateWithPeriodicWatermarks[]) 
subscribedPartitionStates;
-   
-   PeriodicWatermarkEmitter periodicEmitter = 
-   new PeriodicWatermarkEmitter(parts, 
sourceContext, processingTimeProvider, autoWatermarkInterval);
+   @SuppressWarnings("unchecked")
+   PeriodicWatermarkEmitter periodicEmitter = new 
PeriodicWatermarkEmitter(
--- End diff --

Could add the generic parameter `KPH` here.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006679#comment-16006679
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r115997026
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 ---
@@ -106,31 +139,69 @@ protected AbstractFetcher(
}
}
 
-   // create our partition state according to the 
timestamp/watermark mode 
-   this.subscribedPartitionStates = 
initializeSubscribedPartitionStates(
-   assignedPartitionsWithInitialOffsets,
+   this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
+
+   // initialize subscribed partition states with seed partitions
+   this.subscribedPartitionStates = createPartitionStateHolders(
+   seedPartitionsWithInitialOffsets,
timestampWatermarkMode,
-   watermarksPeriodic, watermarksPunctuated,
+   watermarksPeriodic,
+   watermarksPunctuated,
userCodeClassLoader);
 
-   // check that all partition states have a defined offset
+   // check that all seed partition states have a defined offset
for (KafkaTopicPartitionState partitionState : 
subscribedPartitionStates) {
--- End diff --

Could add generic parameter, but is already existing code.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16004093#comment-16004093
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
@haohui you should be able to the PR now, would be great to hear feedback 
on this.
As of now I'm not quite sure whether this will make it into 1.3 though; I 
think we'll have to see about the status of the 1.3 release in a while.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003796#comment-16003796
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3746
  
Any updates on this? Would love to try this out :-)


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996987#comment-15996987
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114821998
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -219,6 +209,15 @@ public void run() {
}
}
 
+   try {
+   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   if (newPartitions != null) {
+   
reassignPartitions(newPartitions);
+   }
+   } catch (AbortedReassignmentException e) {
+   continue;
--- End diff --

Thanks! that makes sense



> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996882#comment-15996882
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114803295
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -219,6 +209,15 @@ public void run() {
}
}
 
+   try {
+   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   if (newPartitions != null) {
+   
reassignPartitions(newPartitions);
+   }
+   } catch (AbortedReassignmentException e) {
+   continue;
--- End diff --

One extra remark:

the external `wakeup` call was intended to un-block the main thread from 
any Kafka blocking operation, and commit offsets as soon as possible (which 
happens at the start of the loop). It's also for making sure that the main 
thread stops work as soon as possible so that there isn't any strange long job 
shutdown issues (which occurred before).

Hence it's important to restore effective wakeup calls (or mimic their 
behaviour) and fall through the loop when an external operation did call wakeup.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996865#comment-15996865
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
@aljoscha thanks for the review! Yes, the Kafka 0.8 code hasn't been 
touched much, and the `KafkaPartitionDiscoverer08` is mainly just a copy of the 
previous partition fetching code with only minor refactoring.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996862#comment-15996862
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114795536
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -219,6 +209,15 @@ public void run() {
}
}
 
+   try {
+   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   if (newPartitions != null) {
+   
reassignPartitions(newPartitions);
+   }
+   } catch (AbortedReassignmentException e) {
+   continue;
--- End diff --

the reason is that the `AbortedReassignmentException` is thrown only if the 
Kafka consumer was `wakeup` before the reassignment started and that wakeup 
effect was consumed by an operation during the reassignment.

Now, if the loop just continues, the next Kafka operation (the `poll`) 
would not be woken up because the wakeup was already consumed.
So, effectively, it would mean the same thing to do this in the catch block 
instead of falling through the loop:
```
} catch (AbortedReassignmentException e) {
consumer.wakeup();
}
```

I'm just eagerly continuing instead of restoring the wakeup. Does that make 
sense?


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996839#comment-15996839
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114793528
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -424,65 +485,136 @@ public void run(SourceContext sourceContext) 
throws Exception {
throw new Exception("The partitions were not set for 
the consumer");
}
 
-   // we need only do work, if we actually have partitions assigned
-   if (!subscribedPartitionsToStartOffsets.isEmpty()) {
-
-   // create the fetcher that will communicate with the 
Kafka brokers
-   final AbstractFetcher fetcher = createFetcher(
-   sourceContext,
-   subscribedPartitionsToStartOffsets,
-   periodicWatermarkAssigner,
-   punctuatedWatermarkAssigner,
-   (StreamingRuntimeContext) 
getRuntimeContext(),
-   offsetCommitMode);
-
-   // publish the reference, for snapshot-, commit-, and 
cancel calls
-   // IMPORTANT: We can only do that now, because only now 
will calls to
-   //the fetchers 'snapshotCurrentState()' 
method return at least
-   //the restored offsets
-   this.kafkaFetcher = fetcher;
-   if (!running) {
-   return;
-   }
-   
-   // (3) run the fetcher' main work method
-   fetcher.runFetchLoop();
+   this.runThread = Thread.currentThread();
+
+   // mark the subtask as temporarily idle if there are no initial 
seed partitions;
+   // once this subtask discovers some partitions and starts 
collecting records, the subtask's
+   // status will automatically be triggered back to be active.
+   if (subscribedPartitionsToStartOffsets.isEmpty()) {
+   sourceContext.markAsTemporarilyIdle();
}
-   else {
-   // this source never completes, so emit a 
Long.MAX_VALUE watermark
-   // to not block watermark forwarding
-   sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
 
-   // wait until this is canceled
-   final Object waitLock = new Object();
+   // create the fetcher that will communicate with the Kafka 
brokers
+   final AbstractFetcher fetcher = createFetcher(
+   sourceContext,
+   subscribedPartitionsToStartOffsets,
+   periodicWatermarkAssigner,
+   punctuatedWatermarkAssigner,
+   (StreamingRuntimeContext) getRuntimeContext(),
+   offsetCommitMode);
+
+   // publish the reference, for snapshot-, commit-, and cancel 
calls
+   // IMPORTANT: We can only do that now, because only now will 
calls to
+   //the fetchers 'snapshotCurrentState()' method 
return at least
+   //the restored offsets
+   this.kafkaFetcher = fetcher;
+
+   if (!running) {
+   return;
+   }
+
+   // depending on whether we were restored with the current state 
version (1.3),
+   // remaining logic branches off into 2 paths:
+   //  1) New state - main fetcher loop executed as separate 
thread, with this
+   // thread running the partition discovery loop
+   //  2) Old state - partition discovery is disabled, simply 
going into the main fetcher loop
+
+   if (!restoredFromOldState) {
+   final AtomicReference fetcherErrorRef = new 
AtomicReference<>();
+   Thread fetcherThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // run the fetcher' main work 
method
+   kafkaFetcher.runFetchLoop();
--- End diff --

Hmm, there actually isn't any 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996822#comment-15996822
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114788919
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -0,0 +1,842 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link KafkaConsumerThread}.
+ */
+public class KafkaConsumerThreadTest {
+
+   /**
+* Tests reassignment works correctly in the case when:
+*  - the consumer initially had no assignments
+*  - new unassigned partitions already have defined offsets
+*
+* Setting a timeout because the test will not finish if there is logic 
error with
+* the reassignment flow.
+*/
+   @SuppressWarnings("unchecked")
+   @Test(timeout = 1)
+   public void 
testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws 
Exception {
+   final String testTopic = "test-topic";
+
+   //  new partitions with defined offsets 
+
+   KafkaTopicPartitionState newPartition1 = new 
KafkaTopicPartitionState<>(
+   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
+   newPartition1.setOffset(23L);
+
+   KafkaTopicPartitionState newPartition2 = new 
KafkaTopicPartitionState<>(
+   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
+   newPartition2.setOffset(31L);
+
+   final List 
newPartitions = new ArrayList<>(2);
+   newPartitions.add(newPartition1);
+   newPartitions.add(newPartition2);
+
+   //  setup mock KafkaConsumer 
+
+   // no initial assignment
+   final Map 
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
+
+   final 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996819#comment-15996819
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114782717
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -219,6 +209,15 @@ public void run() {
}
}
 
+   try {
+   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   if (newPartitions != null) {
+   
reassignPartitions(newPartitions);
+   }
+   } catch (AbortedReassignmentException e) {
+   continue;
--- End diff --

Why is it necessary to continue here, instead of continuing with the rest 
of the loop?


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996821#comment-15996821
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114786022
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -0,0 +1,842 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
+import 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the {@link KafkaConsumerThread}.
+ */
+public class KafkaConsumerThreadTest {
+
+   /**
+* Tests reassignment works correctly in the case when:
+*  - the consumer initially had no assignments
+*  - new unassigned partitions already have defined offsets
+*
+* Setting a timeout because the test will not finish if there is logic 
error with
+* the reassignment flow.
+*/
+   @SuppressWarnings("unchecked")
+   @Test(timeout = 1)
+   public void 
testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws 
Exception {
+   final String testTopic = "test-topic";
+
+   //  new partitions with defined offsets 
+
+   KafkaTopicPartitionState newPartition1 = new 
KafkaTopicPartitionState<>(
+   new KafkaTopicPartition(testTopic, 0), new 
TopicPartition(testTopic, 0));
+   newPartition1.setOffset(23L);
+
+   KafkaTopicPartitionState newPartition2 = new 
KafkaTopicPartitionState<>(
+   new KafkaTopicPartition(testTopic, 1), new 
TopicPartition(testTopic, 1));
+   newPartition2.setOffset(31L);
+
+   final List 
newPartitions = new ArrayList<>(2);
+   newPartitions.add(newPartition1);
+   newPartitions.add(newPartition2);
+
+   //  setup mock KafkaConsumer 
+
+   // no initial assignment
+   final Map 
mockConsumerAssignmentsAndPositions = new LinkedHashMap<>();
+
+   final 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996820#comment-15996820
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114760511
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -424,65 +485,136 @@ public void run(SourceContext sourceContext) 
throws Exception {
throw new Exception("The partitions were not set for 
the consumer");
}
 
-   // we need only do work, if we actually have partitions assigned
-   if (!subscribedPartitionsToStartOffsets.isEmpty()) {
-
-   // create the fetcher that will communicate with the 
Kafka brokers
-   final AbstractFetcher fetcher = createFetcher(
-   sourceContext,
-   subscribedPartitionsToStartOffsets,
-   periodicWatermarkAssigner,
-   punctuatedWatermarkAssigner,
-   (StreamingRuntimeContext) 
getRuntimeContext(),
-   offsetCommitMode);
-
-   // publish the reference, for snapshot-, commit-, and 
cancel calls
-   // IMPORTANT: We can only do that now, because only now 
will calls to
-   //the fetchers 'snapshotCurrentState()' 
method return at least
-   //the restored offsets
-   this.kafkaFetcher = fetcher;
-   if (!running) {
-   return;
-   }
-   
-   // (3) run the fetcher' main work method
-   fetcher.runFetchLoop();
+   this.runThread = Thread.currentThread();
+
+   // mark the subtask as temporarily idle if there are no initial 
seed partitions;
+   // once this subtask discovers some partitions and starts 
collecting records, the subtask's
+   // status will automatically be triggered back to be active.
+   if (subscribedPartitionsToStartOffsets.isEmpty()) {
+   sourceContext.markAsTemporarilyIdle();
}
-   else {
-   // this source never completes, so emit a 
Long.MAX_VALUE watermark
-   // to not block watermark forwarding
-   sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
 
-   // wait until this is canceled
-   final Object waitLock = new Object();
+   // create the fetcher that will communicate with the Kafka 
brokers
+   final AbstractFetcher fetcher = createFetcher(
+   sourceContext,
+   subscribedPartitionsToStartOffsets,
+   periodicWatermarkAssigner,
+   punctuatedWatermarkAssigner,
+   (StreamingRuntimeContext) getRuntimeContext(),
+   offsetCommitMode);
+
+   // publish the reference, for snapshot-, commit-, and cancel 
calls
+   // IMPORTANT: We can only do that now, because only now will 
calls to
+   //the fetchers 'snapshotCurrentState()' method 
return at least
+   //the restored offsets
+   this.kafkaFetcher = fetcher;
+
+   if (!running) {
+   return;
+   }
+
+   // depending on whether we were restored with the current state 
version (1.3),
+   // remaining logic branches off into 2 paths:
+   //  1) New state - main fetcher loop executed as separate 
thread, with this
+   // thread running the partition discovery loop
+   //  2) Old state - partition discovery is disabled, simply 
going into the main fetcher loop
+
+   if (!restoredFromOldState) {
+   final AtomicReference fetcherErrorRef = new 
AtomicReference<>();
+   Thread fetcherThread = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   // run the fetcher' main work 
method
+   kafkaFetcher.runFetchLoop();
--- End diff --

Before this, the Fetcher was run 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996621#comment-15996621
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/3746#discussion_r114757153
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java
 ---
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Base class for all partition discoverers.
+ *
+ * This partition discoverer base class implements the logic around 
bookkeeping
+ * discovered partitions, and using the information to determine whether 
or not there
+ * are new partitions that the consumer subtask should subscribe to.
+ *
+ * Subclass implementations should simply implement the logic of using 
the version-specific
+ * Kafka clients to fetch topic and partition metadata.
+ *
+ * Since Kafka clients are generally not thread-safe, partition 
discoverers should
+ * not be concurrently accessed. The only exception for this would be the 
{@link #wakeup()}
+ * call, which allows the discoverer to be interrupted during a {@link 
#discoverPartitions()} call.
+ */
+public abstract class AbstractPartitionDiscoverer {
+
+   /** Describes whether we are discovering partitions for fixed topics or 
a topic pattern. */
+   private final KafkaTopicsDescriptor topicsDescriptor;
+
+   /** Index of the consumer subtask that this partition discoverer 
belongs to. */
+   private final int indexOfThisSubtask;
+
+   /** The total number of consumer subtasks. */
+   private final int numParallelSubtasks;
+
+   /** Flag to determine whether or not the discoverer is closed. */
+   private volatile boolean closed = true;
+
+   /**
+* Flag to determine whether or not the discoverer had been woken up.
+* When set to {@code true}, {@link #discoverPartitions()} would be 
interrupted as early as possible.
+* Once interrupted, the flag is reset.
+*/
+   private volatile boolean wakeup;
+
+   /**
+* Map of topics to they're largest discovered partition id seen by 
this subtask.
+* This state may be updated whenever {@link 
AbstractPartitionDiscoverer#discoverPartitions()} or
+* {@link 
AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)}
 is called.
+*
+* This is used to remove old partitions from the fetched partition 
lists. It is sufficient
+* to keep track of only the largest partition id because Kafka 
partition numbers are only
+* allowed to be increased and has incremental ids.
+*/
+   private final Map topicsToLargestDiscoveredPartitionId;
+
+   public AbstractPartitionDiscoverer(
+   KafkaTopicsDescriptor topicsDescriptor,
+   int indexOfThisSubtask,
+   int numParallelSubtasks) {
+
+   this.topicsDescriptor = checkNotNull(topicsDescriptor);
+   this.indexOfThisSubtask = indexOfThisSubtask;
+   this.numParallelSubtasks = numParallelSubtasks;
+   this.topicsToLargestDiscoveredPartitionId = new HashMap<>();
+   }
+
+   /**
+* Opens the partition discoverer, initializing all required Kafka 
connections.
+*
+* NOTE: thread-safety is not guaranteed.
+*/
+   public void open() throws Exception {
+   closed = false;
+   initializeConnections();
+   }
+
+   /**
+* Closes the partition discoverer, 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-05-03 Thread Bill Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15995858#comment-15995858
 ] 

Bill Liu commented on FLINK-4022:
-

any update on this feature? will it be released in 1.3 ?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976969#comment-15976969
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3746
  
@haohui FYI - tagging you as we discussed about this feature offline


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976900#comment-15976900
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3746

[FLINK-4022] [kafka] Partition and topic discovery for FlinkKafkaConsumer

This PR adds the required internals to allow partition and topic regex 
pattern discovery in the `FlinkKafkaConsumer`.

It doesn't expose a new constructor that accepts regex topic patterns yet. 
I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 
(deprecate the original FlinkKafkaConsumer constructors in favor of new ones 
with new offset behaviours). For this reason, I also propose to update the 
Kafka documentation when the new constructors are added.

## Design

Some description to ease review:

- `AbstractPartitionDiscoverer`:
An `AbstractPartitionDiscoverer` is a stateful utility instance that 
remembers what partitions are discovered already. It also wraps the logic for 
partition-to-subtask assignment. The main `run()` method now has a discovery 
loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed 
interval. This method returns only new partitions that should be subscribed by 
the subtask.
The returned partitions are used to invoke 
`AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.
On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is 
also used to fetch the initial seed startup partitions in `open()`.

- `AbstractFetcher#addDiscoveredPartitions(...)`
The fetcher now has an `unassignedPartitionsQueue` that contains discovered 
partitions not yet assigned to Kafka consumers to be read. Whenever 
`addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will 
create the state holders for the partitions, and add the partitions to the 
queue.
Concrete implementations of the fetcher should continuously poll this queue 
in the fetch loop. If partitions are found from the queue, they should be 
assigned for consuming.

- Concrete fetchers continuously polls the queue in `runFetchLoop()`
For 0.8, this simply means that the original `unassignedPartitionsQueue` in 
Kafka08Fetcher is moved to the base abstract fetcher class. Nothing else is 
touched.
For 0.9+, queue polling and partition reassignment for the high-level 
consumer happens in `KafkaConsumerThread`.

## Limitations

For the partition discovery to work properly after restores, the previous 
state had to be migrated to use union list states instead 
(`OperatorStateStore#getUnionListState()`).

One limitation that comes with this migration is that if the current run 
was restored from old state (Flink 1.1 / 1.2), then partition discovery is 
disabled. In order to use partition discovery, the user must do a manual 
snapshot (with the new union state) & restore.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4022

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3746


commit 67c3b872cc0129b2f87b565e9fb92908926eac64
Author: Tzu-Li (Gordon) Tai 
Date:   2017-04-20T09:17:43Z

[FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer

commit bf2dd78e7706b9aae08b75ff917878e3f2ceb68b
Author: Tzu-Li (Gordon) Tai 
Date:   2017-04-20T15:08:39Z

[FLINK-4022] Migrate to union list state




> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976879#comment-15976879
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/3476


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976875#comment-15976875
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3476
  
Closing this PR in a favor of an updated version ..


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897191#comment-15897191
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3476
  
Seems like some Kafka tests are failing .. looking into it.


> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2017-03-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896986#comment-15896986
 ] 

ASF GitHub Bot commented on FLINK-4022:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/3476

[FLINK-4022] [kafka] Partition and topic pattern discovery for 
FlinkKafkaConsumer

This PR adds partition and topic regex pattern discovery to the 
`FlinkKafkaConsumer`.

It doesn't expose a new constructor that accepts regex topic patterns yet. 
I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 
(deprecate the original `FlinkKafkaConsumer` constructors in favor of new ones 
with new offset behaviours). For this reason, I also propose to update the 
Kafka documentation when the new constructors are added.

### Design

Some description to ease review:

- `AbstractPartitionDiscoverer`:
A `AbstractPartitionDiscoverer` is a stateful utility instance that 
remembers what partitions are discovered already. It also wraps the logic for 
partition-to-subtask assignment. The main `run()` method now has a discovery 
loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed 
interval. This method returns only new partitions that should be subscribed by 
the subtask.
The returned partitions are used to invoke 
`AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher.

On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is 
also used to fetch the initial seed startup partitions in `open()`.

- `AbstractFetcher#addDiscoveredPartitions(...)`

The fetcher now has a `unassignedPartitionsQueue` that contains discovered 
partitions not yet consumed by concrete Kafka clients. Whenever 
`addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will 
create the state holders for the partitions, and add the partitions to the 
queue.

Concrete implementations of the fetcher should continuously poll this queue 
in the fetch loop. If partitions are found from the queue, they should be 
assigned for consuming.

- Concrete fetchers continuously polls the queue in `runFetchLoop()`

For 0.8, this simply means that the original `unassignedPartitionsQueue` in 
`Kafka08Fetcher` is moved to the base abstract fetcher class. Nothing else is 
touched.

For 0.9+, queue polling and partition reassignment for the high-level 
consumer happens in `KafkaConsumerThread`.

### TODOs

This PR serves as a preview for the new functionality. Below are some 
pending TODOs

- Currently, partition discovery will not work correctly after restore. The 
reason for this is explained with `TODO` comments within the 
`FlinkKafkaConsumerBase#open()` method. For this to work correctly, @rmetzger 
and I are considering 2 options: 1) use broadcast state, or 2) assign 
partitions using `maxParallelism` and `assignedKeyGroupIds` instead of subtask 
index / number of subtasks.

- The PR still lacks exactly-once integration tests with Kafka 
repartitioning / dynamic topics.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4022

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3476


commit 469a3c964e13ff559afd1d46933113bcff48eafc
Author: Tzu-Li (Gordon) Tai 
Date:   2017-03-06T09:20:32Z

[FLINK-4022] [kafka] Partition and topic pattern discovery for 
FlinkKafkaConsumer




> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-12-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15715453#comment-15715453
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


I would like to move along with this task with a first-version implementation 
for 1.2 that doesn't depend on FLINK-5017.
FLINK-5017 most probably need more time for reviews and testing to make it into 
master, and it might not be a good idea to slow down releasing of user 
requested features like this one.

I'll move this issue up my todo list and prepare a PR over the next few days 
now.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632537#comment-15632537
 ] 

Jamie Grier commented on FLINK-4022:


Yes, definitely get input from Stephan and/or Aljoscha.  There may be a good 
reason why this simple solution wont' work.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-03 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632388#comment-15632388
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


Hi [~jgrier], I actually was thinking about a solution like the one you 
explained, which I just described in FLINK-4576 also.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-03 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632378#comment-15632378
 ] 

Jamie Grier commented on FLINK-4022:


Rather than emitting Long.MAX_VALUE for subtasks without partitions or using a 
global watermark service, can we not just have subtasks without partitions (or 
in general are not making progress) emit a special value for watermark which 
means "do not consider this subtask when calculating the current watermark 
downstream"?

The benefit, of course, is that this doesn't require a central service / 
coordination.  Can't this achieve the same thing?  When a partition comes on 
line after this, of course, all of it's data will be considered late -- but 
basically, what else could you do?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630704#comment-15630704
 ] 

Stephan Ewen commented on FLINK-4022:
-

Ah, okay, this is relevant when there are initially idle subtasks.
When there are not, this is not a problem.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629801#comment-15629801
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


Hi [~StephanEwen],

Right now, for subtasks that do not have partitions assigned to it, we will 
emit a max value watermark from that subtask because it will not read any data. 
However, with partition discovery enabled, subtasks which initially do not have 
partitions might be assigned one later on and start reading data, and this will 
mess up the watermarks downstream.

We also cannot let idle subtasks not emit any watermarks and just wait for 
partitions be assigned to it, because then downstream time window states will 
unboundly accumulate (FLINK-4341).

So, the watermark service came around as a mean to let source subtasks that 
currently do not read any data (thus cannot emit any watermarks for event time) 
emit the global low watermark across subtasks instead.

For this I think we still need the watermark service.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629341#comment-15629341
 ] 

Stephan Ewen commented on FLINK-4022:
-

Thinking about this, I am actually not sure that we need the low watermark 
service:

For Event Time:
  - The actual timestamps are usually part of the data, not derived from the 
low watermark service
  - If the timestamps are late, then the data is actually late (partition was 
discovered late)
  - Watermarks are derived from the data not some other service.
  - When the FlinkKafkaConsumer tracks per-partition-watermarks, then the new 
"currentWatermark" for the new partition in that source instance starts with 
that source's current watermark
  - When there is only watermark generation after the FlinkKafkaConsumer, then 
that one has its watermark anyways and it just sees potentially some  late data 
passes through it

For Ingestion time:
  - Timestamps and watermarks depend purely on the local clock

[~tzulitai] and [~aljoscha] What do you think? Why do we still need a watermark 
service here?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629178#comment-15629178
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


This is currently blocked by FLINK-4576, which I'm working on resolving first. 
For this task in particular there isn't much progress yet. However, I'd also 
like to target this feature for the 1.2 release.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-11-02 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629091#comment-15629091
 ] 

Jamie Grier commented on FLINK-4022:


I also think this would be a great feature and a few Flink users have asked 
about this -- both dynamic partition discover within one topic and also dynamic 
topic discovery.

Any progress on this?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-07-08 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367444#comment-15367444
 ] 

Robert Metzger commented on FLINK-4022:
---

Hi [~tzulitai], 
Yes, go ahead and assign the JIRA to yourself. I'm not aware of anybody else 
working on this.
I would prefer to merge this feature after the 1.1 release to give us more time 
to test it properly.
I also think that the implementation is going to be somewhat similar to the 
Kinesis implementation.

I don't expect any issues from Kafka with the constant partition fetching, but 
we have to do some experiments to find that out.

Please decide yourself whether it makes sense to fix the related issues before 
this one or together with this one.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-07-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367094#comment-15367094
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


Related: https://issues.apache.org/jira/browse/FLINK-4023 and 
https://issues.apache.org/jira/browse/FLINK-4155.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold partitions after a rebalance. 
> Instead, un-assigned subtasks should be running a fetcher instance too and 
> take part as a process pool for the consumer group of the subscribed topics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-07-07 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367086#comment-15367086
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


[~rmetzger] I can start working on this now (but I don't think I'll be able to 
finish it before RCs for 1.1). Before I get onto it, I'll like to collect 
anything I should be aware of for the implementation. I think the way to go 
will mainly be quite the same as the shard discovery in the Kinesis connector. 
Is there any other Kafka API you know of that we may use for this? Or any 
implications with constantly fetching partition info from Kafka, like what the 
Kinesis connector does?

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Example: allow users to subscribe to "topic-n*", so that the consumer 
> automatically reads from "topic-n1", "topic-n2", ... and so on as they are 
> added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.
> 4. If subtasks don't initially have any assigned partitions, we shouldn't 
> emit MAX_VALUE watermark, since it may hold 

[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer

2016-06-05 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315787#comment-15315787
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4022:


I'd like to start working on this feature if it's ok with [~rmetzger], after 
confirming that this implementation is acceptable.

It'll introduce more complexity for state restoring for now, but once merged 
restore states are available it'll be simple again.
On the other hand, if we want to stick to defined partition assignments, then 
the complexity will probably be in implementing coordination for discovering 
new partitions / rebalancing between subtasks ourselves.

> Partition discovery / regex topic subscription for the Kafka consumer
> -
>
> Key: FLINK-4022
> URL: https://issues.apache.org/jira/browse/FLINK-4022
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> Allow users to subscribe to "topic-n*", so that the consumer automatically 
> reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.
> I propose to implement this feature by the following description:
> Since the overall list of partitions to read will change after job 
> submission, the main big change required for this feature will be dynamic 
> partition assignment to subtasks while the Kafka consumer is running. This 
> will mainly be accomplished using Kafka 0.9.x API 
> `KafkaConsumer#subscribe(java.util.regex.Pattern, 
> ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be 
> added to the same consumer group when instantiated, and rely on Kafka to 
> dynamically reassign partitions to them whenever a rebalance happens. The 
> registered `ConsumerRebalanceListener` is a callback that is called right 
> before and after rebalancing happens. We'll use this callback to let each 
> subtask commit its last offsets of partitions its currently responsible of to 
> an external store (or Kafka) before a rebalance; after rebalance and the 
> substasks gets the new partitions it'll be reading from, they'll read from 
> the external store to get the last offsets for their new partitions 
> (partitions which don't have offset entries in the store are new partitions 
> causing the rebalancing).
> The tricky part will be restoring Flink checkpoints when the partition 
> assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
> the offsets of partitions they are currently holding. Restoring will be  a 
> bit different in that subtasks might not be assigned matching partitions to 
> the snapshot the subtask is restored with (since we're letting Kafka 
> dynamically assign partitions). There will need to be a coordination process 
> where, if a restore state exists, all subtasks first commit the offsets they 
> receive (as a result of the restore state) to the external store, and then 
> all subtasks attempt to find a last offset for the partitions it is holding.
> However, if the globally merged restore state feature mentioned by 
> [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
> available, then the restore will be simple again, as each subtask has full 
> access to previous global state therefore coordination is not required.
> I think changing to dynamic partition assignment is also good in the long run 
> for handling topic repartitioning.
> Overall,
> User-facing API changes:
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
> DeserializationSchema, Properties)
> - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
> KeyedDeserializationSchema, Properties)
> Implementation changes:
> 1. Dynamic partition assigning depending on KafkaConsumer#subscribe
> - Remove partition list querying from constructor
> - Remove static partition assigning to substasks in run()
> - Instead of using KafkaConsumer#assign() in fetchers to manually assign 
> static partitions, use subscribe() registered with the callback 
> implementation explained above.
> 2. Restoring from checkpointed states
> - Snapshotting should remain unchanged
> - Restoring requires subtasks to coordinate the restored offsets they hold 
> before continuing (unless we are able to have merged restore states).
> 3. For previous consumer functionality (consume from fixed list of topics), 
> the KafkaConsumer#subscribe() has a corresponding overload method for fixed 
> list of topics. We can simply decide which subscribe() overload to use 
> depending on whether a regex Pattern or list of topics is supplied.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)