[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16071078#comment-16071078 ] ASF GitHub Bot commented on FLINK-4022: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3746 > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.4.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069669#comment-16069669 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 (please ignore the batch of commits. I'm just using this branch for an overall Travis run before committing the batch; personal accounts get time-outs too constantly) > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.4.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069437#comment-16069437 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 Travis passed, merging ... > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.4.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068277#comment-16068277 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 Thanks for the review @aljoscha. Will address the comment and merge after Travis gives green :) > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.4.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068273#comment-16068273 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r124785388 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -189,6 +192,8 @@ * @param topics fixed list of topics to subscribe to (null, if using topic pattern) * @param topicPattern the topic pattern to subscribe to (null, if using fixed topics) * @param deserializer The deserializer to turn raw byte messages into Java/Scala objects. +* @param discoveryIntervalMillis the topic / partition discovery interval, in +*milliseconds (0 if discovery is disabled). --- End diff -- Here it says `0` for disabled while in other places it's `Long.MIN_VALUE`. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.4.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16068095#comment-16068095 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 Hi @aljoscha, I've rebased this PR, fixed the previous blocking issue (discovery enabling & restoring from round-robin list state), and added documentation for the limitation. Could you have another look? Thanks! > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.4.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and >
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015457#comment-16015457 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 Hi @haohui, I revisited this, but I think there is no easy workaround for this, unfortunately. This can only be properly merged in 1.4 after some additional requirements are added to Flink. However, if you know that during Flink 1.2 your FlinkKafkaConsumer was not rescaled before, then in that case specifically you should be able to use this patch. It would however require a custom build of the connector on your side. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013493#comment-16013493 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 @haohui I can not guarantee this, but I can try revisiting this for a workaround after I've done some testing on the current RC1. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16013303#comment-16013303 ] ASF GitHub Bot commented on FLINK-4022: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3746 @tzulitai we are interested in putting this feature in production soon. Do you think whether it is possible for us to get a workaround for 1.3? Thanks. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009195#comment-16009195 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 Hi @haohui, unfortunately we might not be able to merge this for 1.3. The reason is that the feature requires a migration from partitionable list state to union list state, which currently can not be properly done, as can be seen in Aljoscha's inline comment. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009191#comment-16009191 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r116352819 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -503,23 +644,30 @@ public void close() throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); - offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - if (context.isRestored()) { - if (restoredState == null) { - restoredState = new HashMap<>(); - for (Tuple2kafkaOffset : offsetsStateForCheckpoint.get()) { - restoredState.put(kafkaOffset.f0, kafkaOffset.f1); - } + ListState > oldRoundRobinListState = + stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - LOG.info("Setting restore state in the FlinkKafkaConsumer."); - if (LOG.isDebugEnabled()) { - LOG.debug("Using the following offsets: {}", restoredState); - } + this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + TypeInformation.of(new TypeHint >() {}))); + + if (context.isRestored() && !restoredFromOldState) { + restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); + + // migrate from 1.2 state, if there is any + for (Tuple2 kafkaOffset : oldRoundRobinListState.get()) { + restoredFromOldState = true; --- End diff -- You're right, this is definitely problematic. I think we'll have to block this feature until we have the required state information accessible before we can merge this then. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006678#comment-16006678 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r116000971 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -503,23 +644,30 @@ public void close() throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); - offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - if (context.isRestored()) { - if (restoredState == null) { - restoredState = new HashMap<>(); - for (Tuple2kafkaOffset : offsetsStateForCheckpoint.get()) { - restoredState.put(kafkaOffset.f0, kafkaOffset.f1); - } + ListState > oldRoundRobinListState = + stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); - LOG.info("Setting restore state in the FlinkKafkaConsumer."); - if (LOG.isDebugEnabled()) { - LOG.debug("Using the following offsets: {}", restoredState); - } + this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + TypeInformation.of(new TypeHint >() {}))); + + if (context.isRestored() && !restoredFromOldState) { + restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator()); + + // migrate from 1.2 state, if there is any + for (Tuple2 kafkaOffset : oldRoundRobinListState.get()) { + restoredFromOldState = true; --- End diff -- Could it be that we restore from an old 1.2 snapshot and don't get anything here because we simply weren't assigned any state. (For example because the parallelism is higher than before.) > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006677#comment-16006677 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r115996970 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -106,31 +139,69 @@ protected AbstractFetcher( } } - // create our partition state according to the timestamp/watermark mode - this.subscribedPartitionStates = initializeSubscribedPartitionStates( - assignedPartitionsWithInitialOffsets, + this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); + + // initialize subscribed partition states with seed partitions + this.subscribedPartitionStates = createPartitionStateHolders( + seedPartitionsWithInitialOffsets, timestampWatermarkMode, - watermarksPeriodic, watermarksPunctuated, + watermarksPeriodic, + watermarksPunctuated, userCodeClassLoader); - // check that all partition states have a defined offset + // check that all seed partition states have a defined offset for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { if (!partitionState.isOffsetDefined()) { - throw new IllegalArgumentException("The fetcher was assigned partitions with undefined initial offsets."); + throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets."); } } - + + // all seed partitions are not assigned yet, so should be added to the unassigned partitions queue + for (KafkaTopicPartitionState partition : subscribedPartitionStates) { + unassignedPartitionsQueue.add(partition); + } + // if we have periodic watermarks, kick off the interval scheduler if (timestampWatermarkMode == PERIODIC_WATERMARKS) { - KafkaTopicPartitionStateWithPeriodicWatermarks[] parts = - (KafkaTopicPartitionStateWithPeriodicWatermarks[]) subscribedPartitionStates; - - PeriodicWatermarkEmitter periodicEmitter = - new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval); + @SuppressWarnings("unchecked") + PeriodicWatermarkEmitter periodicEmitter = new PeriodicWatermarkEmitter( --- End diff -- Could add the generic parameter `KPH` here. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from >
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16006679#comment-16006679 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r115997026 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -106,31 +139,69 @@ protected AbstractFetcher( } } - // create our partition state according to the timestamp/watermark mode - this.subscribedPartitionStates = initializeSubscribedPartitionStates( - assignedPartitionsWithInitialOffsets, + this.unassignedPartitionsQueue = new ClosableBlockingQueue<>(); + + // initialize subscribed partition states with seed partitions + this.subscribedPartitionStates = createPartitionStateHolders( + seedPartitionsWithInitialOffsets, timestampWatermarkMode, - watermarksPeriodic, watermarksPunctuated, + watermarksPeriodic, + watermarksPunctuated, userCodeClassLoader); - // check that all partition states have a defined offset + // check that all seed partition states have a defined offset for (KafkaTopicPartitionState partitionState : subscribedPartitionStates) { --- End diff -- Could add generic parameter, but is already existing code. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > -
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16004093#comment-16004093 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 @haohui you should be able to the PR now, would be great to hear feedback on this. As of now I'm not quite sure whether this will make it into 1.3 though; I think we'll have to see about the status of the 1.3 release in a while. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003796#comment-16003796 ] ASF GitHub Bot commented on FLINK-4022: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3746 Any updates on this? Would love to try this out :-) > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996987#comment-15996987 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114821998 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -219,6 +209,15 @@ public void run() { } } + try { + newPartitions = unassignedPartitionsQueue.pollBatch(); + if (newPartitions != null) { + reassignPartitions(newPartitions); + } + } catch (AbortedReassignmentException e) { + continue; --- End diff -- Thanks! that makes sense > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold >
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996882#comment-15996882 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114803295 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -219,6 +209,15 @@ public void run() { } } + try { + newPartitions = unassignedPartitionsQueue.pollBatch(); + if (newPartitions != null) { + reassignPartitions(newPartitions); + } + } catch (AbortedReassignmentException e) { + continue; --- End diff -- One extra remark: the external `wakeup` call was intended to un-block the main thread from any Kafka blocking operation, and commit offsets as soon as possible (which happens at the start of the loop). It's also for making sure that the main thread stops work as soon as possible so that there isn't any strange long job shutdown issues (which occurred before). Hence it's important to restore effective wakeup calls (or mimic their behaviour) and fall through the loop when an external operation did call wakeup. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1.
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996865#comment-15996865 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 @aljoscha thanks for the review! Yes, the Kafka 0.8 code hasn't been touched much, and the `KafkaPartitionDiscoverer08` is mainly just a copy of the previous partition fetching code with only minor refactoring. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996862#comment-15996862 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114795536 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -219,6 +209,15 @@ public void run() { } } + try { + newPartitions = unassignedPartitionsQueue.pollBatch(); + if (newPartitions != null) { + reassignPartitions(newPartitions); + } + } catch (AbortedReassignmentException e) { + continue; --- End diff -- the reason is that the `AbortedReassignmentException` is thrown only if the Kafka consumer was `wakeup` before the reassignment started and that wakeup effect was consumed by an operation during the reassignment. Now, if the loop just continues, the next Kafka operation (the `poll`) would not be woken up because the wakeup was already consumed. So, effectively, it would mean the same thing to do this in the catch block instead of falling through the loop: ``` } catch (AbortedReassignmentException e) { consumer.wakeup(); } ``` I'm just eagerly continuing instead of restoring the wakeup. Does that make sense? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996839#comment-15996839 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114793528 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -424,65 +485,136 @@ public void run(SourceContext sourceContext) throws Exception { throw new Exception("The partitions were not set for the consumer"); } - // we need only do work, if we actually have partitions assigned - if (!subscribedPartitionsToStartOffsets.isEmpty()) { - - // create the fetcher that will communicate with the Kafka brokers - final AbstractFetcherfetcher = createFetcher( - sourceContext, - subscribedPartitionsToStartOffsets, - periodicWatermarkAssigner, - punctuatedWatermarkAssigner, - (StreamingRuntimeContext) getRuntimeContext(), - offsetCommitMode); - - // publish the reference, for snapshot-, commit-, and cancel calls - // IMPORTANT: We can only do that now, because only now will calls to - //the fetchers 'snapshotCurrentState()' method return at least - //the restored offsets - this.kafkaFetcher = fetcher; - if (!running) { - return; - } - - // (3) run the fetcher' main work method - fetcher.runFetchLoop(); + this.runThread = Thread.currentThread(); + + // mark the subtask as temporarily idle if there are no initial seed partitions; + // once this subtask discovers some partitions and starts collecting records, the subtask's + // status will automatically be triggered back to be active. + if (subscribedPartitionsToStartOffsets.isEmpty()) { + sourceContext.markAsTemporarilyIdle(); } - else { - // this source never completes, so emit a Long.MAX_VALUE watermark - // to not block watermark forwarding - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - // wait until this is canceled - final Object waitLock = new Object(); + // create the fetcher that will communicate with the Kafka brokers + final AbstractFetcher fetcher = createFetcher( + sourceContext, + subscribedPartitionsToStartOffsets, + periodicWatermarkAssigner, + punctuatedWatermarkAssigner, + (StreamingRuntimeContext) getRuntimeContext(), + offsetCommitMode); + + // publish the reference, for snapshot-, commit-, and cancel calls + // IMPORTANT: We can only do that now, because only now will calls to + //the fetchers 'snapshotCurrentState()' method return at least + //the restored offsets + this.kafkaFetcher = fetcher; + + if (!running) { + return; + } + + // depending on whether we were restored with the current state version (1.3), + // remaining logic branches off into 2 paths: + // 1) New state - main fetcher loop executed as separate thread, with this + // thread running the partition discovery loop + // 2) Old state - partition discovery is disabled, simply going into the main fetcher loop + + if (!restoredFromOldState) { + final AtomicReference fetcherErrorRef = new AtomicReference<>(); + Thread fetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + // run the fetcher' main work method + kafkaFetcher.runFetchLoop(); --- End diff -- Hmm, there actually isn't any
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996822#comment-15996822 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114788919 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java --- @@ -0,0 +1,842 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link KafkaConsumerThread}. + */ +public class KafkaConsumerThreadTest { + + /** +* Tests reassignment works correctly in the case when: +* - the consumer initially had no assignments +* - new unassigned partitions already have defined offsets +* +* Setting a timeout because the test will not finish if there is logic error with +* the reassignment flow. +*/ + @SuppressWarnings("unchecked") + @Test(timeout = 1) + public void testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws Exception { + final String testTopic = "test-topic"; + + // new partitions with defined offsets + + KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + newPartition1.setOffset(23L); + + KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + newPartition2.setOffset(31L); + + final ListnewPartitions = new ArrayList<>(2); + newPartitions.add(newPartition1); + newPartitions.add(newPartition2); + + // setup mock KafkaConsumer + + // no initial assignment + final Map mockConsumerAssignmentsAndPositions = new LinkedHashMap<>(); + + final
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996819#comment-15996819 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114782717 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -219,6 +209,15 @@ public void run() { } } + try { + newPartitions = unassignedPartitionsQueue.pollBatch(); + if (newPartitions != null) { + reassignPartitions(newPartitions); + } + } catch (AbortedReassignmentException e) { + continue; --- End diff -- Why is it necessary to continue here, instead of continuing with the rest of the loop? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996821#comment-15996821 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114786022 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java --- @@ -0,0 +1,842 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link KafkaConsumerThread}. + */ +public class KafkaConsumerThreadTest { + + /** +* Tests reassignment works correctly in the case when: +* - the consumer initially had no assignments +* - new unassigned partitions already have defined offsets +* +* Setting a timeout because the test will not finish if there is logic error with +* the reassignment flow. +*/ + @SuppressWarnings("unchecked") + @Test(timeout = 1) + public void testReassigningPartitionsWithDefinedOffsetsWhenNoInitialAssignment() throws Exception { + final String testTopic = "test-topic"; + + // new partitions with defined offsets + + KafkaTopicPartitionState newPartition1 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 0), new TopicPartition(testTopic, 0)); + newPartition1.setOffset(23L); + + KafkaTopicPartitionState newPartition2 = new KafkaTopicPartitionState<>( + new KafkaTopicPartition(testTopic, 1), new TopicPartition(testTopic, 1)); + newPartition2.setOffset(31L); + + final ListnewPartitions = new ArrayList<>(2); + newPartitions.add(newPartition1); + newPartitions.add(newPartition2); + + // setup mock KafkaConsumer + + // no initial assignment + final Map mockConsumerAssignmentsAndPositions = new LinkedHashMap<>(); + + final
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996820#comment-15996820 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114760511 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -424,65 +485,136 @@ public void run(SourceContext sourceContext) throws Exception { throw new Exception("The partitions were not set for the consumer"); } - // we need only do work, if we actually have partitions assigned - if (!subscribedPartitionsToStartOffsets.isEmpty()) { - - // create the fetcher that will communicate with the Kafka brokers - final AbstractFetcherfetcher = createFetcher( - sourceContext, - subscribedPartitionsToStartOffsets, - periodicWatermarkAssigner, - punctuatedWatermarkAssigner, - (StreamingRuntimeContext) getRuntimeContext(), - offsetCommitMode); - - // publish the reference, for snapshot-, commit-, and cancel calls - // IMPORTANT: We can only do that now, because only now will calls to - //the fetchers 'snapshotCurrentState()' method return at least - //the restored offsets - this.kafkaFetcher = fetcher; - if (!running) { - return; - } - - // (3) run the fetcher' main work method - fetcher.runFetchLoop(); + this.runThread = Thread.currentThread(); + + // mark the subtask as temporarily idle if there are no initial seed partitions; + // once this subtask discovers some partitions and starts collecting records, the subtask's + // status will automatically be triggered back to be active. + if (subscribedPartitionsToStartOffsets.isEmpty()) { + sourceContext.markAsTemporarilyIdle(); } - else { - // this source never completes, so emit a Long.MAX_VALUE watermark - // to not block watermark forwarding - sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); - // wait until this is canceled - final Object waitLock = new Object(); + // create the fetcher that will communicate with the Kafka brokers + final AbstractFetcher fetcher = createFetcher( + sourceContext, + subscribedPartitionsToStartOffsets, + periodicWatermarkAssigner, + punctuatedWatermarkAssigner, + (StreamingRuntimeContext) getRuntimeContext(), + offsetCommitMode); + + // publish the reference, for snapshot-, commit-, and cancel calls + // IMPORTANT: We can only do that now, because only now will calls to + //the fetchers 'snapshotCurrentState()' method return at least + //the restored offsets + this.kafkaFetcher = fetcher; + + if (!running) { + return; + } + + // depending on whether we were restored with the current state version (1.3), + // remaining logic branches off into 2 paths: + // 1) New state - main fetcher loop executed as separate thread, with this + // thread running the partition discovery loop + // 2) Old state - partition discovery is disabled, simply going into the main fetcher loop + + if (!restoredFromOldState) { + final AtomicReference fetcherErrorRef = new AtomicReference<>(); + Thread fetcherThread = new Thread(new Runnable() { + @Override + public void run() { + try { + // run the fetcher' main work method + kafkaFetcher.runFetchLoop(); --- End diff -- Before this, the Fetcher was run
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996621#comment-15996621 ] ASF GitHub Bot commented on FLINK-4022: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3746#discussion_r114757153 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscoverer.java --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Base class for all partition discoverers. + * + * This partition discoverer base class implements the logic around bookkeeping + * discovered partitions, and using the information to determine whether or not there + * are new partitions that the consumer subtask should subscribe to. + * + * Subclass implementations should simply implement the logic of using the version-specific + * Kafka clients to fetch topic and partition metadata. + * + * Since Kafka clients are generally not thread-safe, partition discoverers should + * not be concurrently accessed. The only exception for this would be the {@link #wakeup()} + * call, which allows the discoverer to be interrupted during a {@link #discoverPartitions()} call. + */ +public abstract class AbstractPartitionDiscoverer { + + /** Describes whether we are discovering partitions for fixed topics or a topic pattern. */ + private final KafkaTopicsDescriptor topicsDescriptor; + + /** Index of the consumer subtask that this partition discoverer belongs to. */ + private final int indexOfThisSubtask; + + /** The total number of consumer subtasks. */ + private final int numParallelSubtasks; + + /** Flag to determine whether or not the discoverer is closed. */ + private volatile boolean closed = true; + + /** +* Flag to determine whether or not the discoverer had been woken up. +* When set to {@code true}, {@link #discoverPartitions()} would be interrupted as early as possible. +* Once interrupted, the flag is reset. +*/ + private volatile boolean wakeup; + + /** +* Map of topics to they're largest discovered partition id seen by this subtask. +* This state may be updated whenever {@link AbstractPartitionDiscoverer#discoverPartitions()} or +* {@link AbstractPartitionDiscoverer#setAndCheckDiscoveredPartition(KafkaTopicPartition)} is called. +* +* This is used to remove old partitions from the fetched partition lists. It is sufficient +* to keep track of only the largest partition id because Kafka partition numbers are only +* allowed to be increased and has incremental ids. +*/ + private final MaptopicsToLargestDiscoveredPartitionId; + + public AbstractPartitionDiscoverer( + KafkaTopicsDescriptor topicsDescriptor, + int indexOfThisSubtask, + int numParallelSubtasks) { + + this.topicsDescriptor = checkNotNull(topicsDescriptor); + this.indexOfThisSubtask = indexOfThisSubtask; + this.numParallelSubtasks = numParallelSubtasks; + this.topicsToLargestDiscoveredPartitionId = new HashMap<>(); + } + + /** +* Opens the partition discoverer, initializing all required Kafka connections. +* +* NOTE: thread-safety is not guaranteed. +*/ + public void open() throws Exception { + closed = false; + initializeConnections(); + } + + /** +* Closes the partition discoverer,
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15995858#comment-15995858 ] Bill Liu commented on FLINK-4022: - any update on this feature? will it be released in 1.3 ? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976969#comment-15976969 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3746 @haohui FYI - tagging you as we discussed about this feature offline > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976900#comment-15976900 ] ASF GitHub Bot commented on FLINK-4022: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3746 [FLINK-4022] [kafka] Partition and topic discovery for FlinkKafkaConsumer This PR adds the required internals to allow partition and topic regex pattern discovery in the `FlinkKafkaConsumer`. It doesn't expose a new constructor that accepts regex topic patterns yet. I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 (deprecate the original FlinkKafkaConsumer constructors in favor of new ones with new offset behaviours). For this reason, I also propose to update the Kafka documentation when the new constructors are added. ## Design Some description to ease review: - `AbstractPartitionDiscoverer`: An `AbstractPartitionDiscoverer` is a stateful utility instance that remembers what partitions are discovered already. It also wraps the logic for partition-to-subtask assignment. The main `run()` method now has a discovery loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed interval. This method returns only new partitions that should be subscribed by the subtask. The returned partitions are used to invoke `AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher. On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is also used to fetch the initial seed startup partitions in `open()`. - `AbstractFetcher#addDiscoveredPartitions(...)` The fetcher now has an `unassignedPartitionsQueue` that contains discovered partitions not yet assigned to Kafka consumers to be read. Whenever `addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will create the state holders for the partitions, and add the partitions to the queue. Concrete implementations of the fetcher should continuously poll this queue in the fetch loop. If partitions are found from the queue, they should be assigned for consuming. - Concrete fetchers continuously polls the queue in `runFetchLoop()` For 0.8, this simply means that the original `unassignedPartitionsQueue` in Kafka08Fetcher is moved to the base abstract fetcher class. Nothing else is touched. For 0.9+, queue polling and partition reassignment for the high-level consumer happens in `KafkaConsumerThread`. ## Limitations For the partition discovery to work properly after restores, the previous state had to be migrated to use union list states instead (`OperatorStateStore#getUnionListState()`). One limitation that comes with this migration is that if the current run was restored from old state (Flink 1.1 / 1.2), then partition discovery is disabled. In order to use partition discovery, the user must do a manual snapshot (with the new union state) & restore. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4022 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3746.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3746 commit 67c3b872cc0129b2f87b565e9fb92908926eac64 Author: Tzu-Li (Gordon) TaiDate: 2017-04-20T09:17:43Z [FLINK-4022] [kafka] Partition / topic discovery for FlinkKafkaConsumer commit bf2dd78e7706b9aae08b75ff917878e3f2ceb68b Author: Tzu-Li (Gordon) Tai Date: 2017-04-20T15:08:39Z [FLINK-4022] Migrate to union list state > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976879#comment-15976879 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3476 > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976875#comment-15976875 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3476 Closing this PR in a favor of an updated version .. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897191#comment-15897191 ] ASF GitHub Bot commented on FLINK-4022: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3476 Seems like some Kafka tests are failing .. looking into it. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896986#comment-15896986 ] ASF GitHub Bot commented on FLINK-4022: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3476 [FLINK-4022] [kafka] Partition and topic pattern discovery for FlinkKafkaConsumer This PR adds partition and topic regex pattern discovery to the `FlinkKafkaConsumer`. It doesn't expose a new constructor that accepts regex topic patterns yet. I propose to expose that with https://issues.apache.org/jira/browse/FLINK-5704 (deprecate the original `FlinkKafkaConsumer` constructors in favor of new ones with new offset behaviours). For this reason, I also propose to update the Kafka documentation when the new constructors are added. ### Design Some description to ease review: - `AbstractPartitionDiscoverer`: A `AbstractPartitionDiscoverer` is a stateful utility instance that remembers what partitions are discovered already. It also wraps the logic for partition-to-subtask assignment. The main `run()` method now has a discovery loop that calls `AbstractPartitionDiscoverer#discoverPartitions()` on a fixed interval. This method returns only new partitions that should be subscribed by the subtask. The returned partitions are used to invoke `AbstractFetcher#addDiscoveredPartitions(...)` on the fetcher. On a fresh startup, `AbstractPartitionDiscoverer#discoverPartitions()` is also used to fetch the initial seed startup partitions in `open()`. - `AbstractFetcher#addDiscoveredPartitions(...)` The fetcher now has a `unassignedPartitionsQueue` that contains discovered partitions not yet consumed by concrete Kafka clients. Whenever `addDiscoveredPartitions(...)` is called on the fetcher, the fetcher will create the state holders for the partitions, and add the partitions to the queue. Concrete implementations of the fetcher should continuously poll this queue in the fetch loop. If partitions are found from the queue, they should be assigned for consuming. - Concrete fetchers continuously polls the queue in `runFetchLoop()` For 0.8, this simply means that the original `unassignedPartitionsQueue` in `Kafka08Fetcher` is moved to the base abstract fetcher class. Nothing else is touched. For 0.9+, queue polling and partition reassignment for the high-level consumer happens in `KafkaConsumerThread`. ### TODOs This PR serves as a preview for the new functionality. Below are some pending TODOs - Currently, partition discovery will not work correctly after restore. The reason for this is explained with `TODO` comments within the `FlinkKafkaConsumerBase#open()` method. For this to work correctly, @rmetzger and I are considering 2 options: 1) use broadcast state, or 2) assign partitions using `maxParallelism` and `assignedKeyGroupIds` instead of subtask index / number of subtasks. - The PR still lacks exactly-once integration tests with Kafka repartitioning / dynamic topics. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-4022 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3476.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3476 commit 469a3c964e13ff559afd1d46933113bcff48eafc Author: Tzu-Li (Gordon) TaiDate: 2017-03-06T09:20:32Z [FLINK-4022] [kafka] Partition and topic pattern discovery for FlinkKafkaConsumer > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15715453#comment-15715453 ] Tzu-Li (Gordon) Tai commented on FLINK-4022: I would like to move along with this task with a first-version implementation for 1.2 that doesn't depend on FLINK-5017. FLINK-5017 most probably need more time for reviews and testing to make it into master, and it might not be a good idea to slow down releasing of user requested features like this one. I'll move this issue up my todo list and prepare a PR over the next few days now. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632537#comment-15632537 ] Jamie Grier commented on FLINK-4022: Yes, definitely get input from Stephan and/or Aljoscha. There may be a good reason why this simple solution wont' work. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632388#comment-15632388 ] Tzu-Li (Gordon) Tai commented on FLINK-4022: Hi [~jgrier], I actually was thinking about a solution like the one you explained, which I just described in FLINK-4576 also. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632378#comment-15632378 ] Jamie Grier commented on FLINK-4022: Rather than emitting Long.MAX_VALUE for subtasks without partitions or using a global watermark service, can we not just have subtasks without partitions (or in general are not making progress) emit a special value for watermark which means "do not consider this subtask when calculating the current watermark downstream"? The benefit, of course, is that this doesn't require a central service / coordination. Can't this achieve the same thing? When a partition comes on line after this, of course, all of it's data will be considered late -- but basically, what else could you do? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15630704#comment-15630704 ] Stephan Ewen commented on FLINK-4022: - Ah, okay, this is relevant when there are initially idle subtasks. When there are not, this is not a problem. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629801#comment-15629801 ] Tzu-Li (Gordon) Tai commented on FLINK-4022: Hi [~StephanEwen], Right now, for subtasks that do not have partitions assigned to it, we will emit a max value watermark from that subtask because it will not read any data. However, with partition discovery enabled, subtasks which initially do not have partitions might be assigned one later on and start reading data, and this will mess up the watermarks downstream. We also cannot let idle subtasks not emit any watermarks and just wait for partitions be assigned to it, because then downstream time window states will unboundly accumulate (FLINK-4341). So, the watermark service came around as a mean to let source subtasks that currently do not read any data (thus cannot emit any watermarks for event time) emit the global low watermark across subtasks instead. For this I think we still need the watermark service. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629341#comment-15629341 ] Stephan Ewen commented on FLINK-4022: - Thinking about this, I am actually not sure that we need the low watermark service: For Event Time: - The actual timestamps are usually part of the data, not derived from the low watermark service - If the timestamps are late, then the data is actually late (partition was discovered late) - Watermarks are derived from the data not some other service. - When the FlinkKafkaConsumer tracks per-partition-watermarks, then the new "currentWatermark" for the new partition in that source instance starts with that source's current watermark - When there is only watermark generation after the FlinkKafkaConsumer, then that one has its watermark anyways and it just sees potentially some late data passes through it For Ingestion time: - Timestamps and watermarks depend purely on the local clock [~tzulitai] and [~aljoscha] What do you think? Why do we still need a watermark service here? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629178#comment-15629178 ] Tzu-Li (Gordon) Tai commented on FLINK-4022: This is currently blocked by FLINK-4576, which I'm working on resolving first. For this task in particular there isn't much progress yet. However, I'd also like to target this feature for the 1.2 release. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15629091#comment-15629091 ] Jamie Grier commented on FLINK-4022: I also think this would be a great feature and a few Flink users have asked about this -- both dynamic partition discover within one topic and also dynamic topic discovery. Any progress on this? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.2.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367444#comment-15367444 ] Robert Metzger commented on FLINK-4022: --- Hi [~tzulitai], Yes, go ahead and assign the JIRA to yourself. I'm not aware of anybody else working on this. I would prefer to merge this feature after the 1.1 release to give us more time to test it properly. I also think that the implementation is going to be somewhat similar to the Kinesis implementation. I don't expect any issues from Kafka with the constant partition fetching, but we have to do some experiments to find that out. Please decide yourself whether it makes sense to fix the related issues before this one or together with this one. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367094#comment-15367094 ] Tzu-Li (Gordon) Tai commented on FLINK-4022: Related: https://issues.apache.org/jira/browse/FLINK-4023 and https://issues.apache.org/jira/browse/FLINK-4155. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold partitions after a rebalance. > Instead, un-assigned subtasks should be running a fetcher instance too and > take part as a process pool for the consumer group of the subscribed topics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367086#comment-15367086 ] Tzu-Li (Gordon) Tai commented on FLINK-4022: [~rmetzger] I can start working on this now (but I don't think I'll be able to finish it before RCs for 1.1). Before I get onto it, I'll like to collect anything I should be aware of for the implementation. I think the way to go will mainly be quite the same as the shard discovery in the Kinesis connector. Is there any other Kafka API you know of that we may use for this? Or any implications with constantly fetching partition info from Kafka, like what the Kinesis connector does? > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector, Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Example: allow users to subscribe to "topic-n*", so that the consumer > automatically reads from "topic-n1", "topic-n2", ... and so on as they are > added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. > 4. If subtasks don't initially have any assigned partitions, we shouldn't > emit MAX_VALUE watermark, since it may hold
[jira] [Commented] (FLINK-4022) Partition discovery / regex topic subscription for the Kafka consumer
[ https://issues.apache.org/jira/browse/FLINK-4022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315787#comment-15315787 ] Tzu-Li (Gordon) Tai commented on FLINK-4022: I'd like to start working on this feature if it's ok with [~rmetzger], after confirming that this implementation is acceptable. It'll introduce more complexity for state restoring for now, but once merged restore states are available it'll be simple again. On the other hand, if we want to stick to defined partition assignments, then the complexity will probably be in implementing coordination for discovering new partitions / rebalancing between subtasks ourselves. > Partition discovery / regex topic subscription for the Kafka consumer > - > > Key: FLINK-4022 > URL: https://issues.apache.org/jira/browse/FLINK-4022 > Project: Flink > Issue Type: New Feature >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > Allow users to subscribe to "topic-n*", so that the consumer automatically > reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka. > I propose to implement this feature by the following description: > Since the overall list of partitions to read will change after job > submission, the main big change required for this feature will be dynamic > partition assignment to subtasks while the Kafka consumer is running. This > will mainly be accomplished using Kafka 0.9.x API > `KafkaConsumer#subscribe(java.util.regex.Pattern, > ConsumerRebalanceListener)`. Each KafkaConsumers in each subtask will be > added to the same consumer group when instantiated, and rely on Kafka to > dynamically reassign partitions to them whenever a rebalance happens. The > registered `ConsumerRebalanceListener` is a callback that is called right > before and after rebalancing happens. We'll use this callback to let each > subtask commit its last offsets of partitions its currently responsible of to > an external store (or Kafka) before a rebalance; after rebalance and the > substasks gets the new partitions it'll be reading from, they'll read from > the external store to get the last offsets for their new partitions > (partitions which don't have offset entries in the store are new partitions > causing the rebalancing). > The tricky part will be restoring Flink checkpoints when the partition > assignment is dynamic. Snapshotting will remain the same - subtasks snapshot > the offsets of partitions they are currently holding. Restoring will be a > bit different in that subtasks might not be assigned matching partitions to > the snapshot the subtask is restored with (since we're letting Kafka > dynamically assign partitions). There will need to be a coordination process > where, if a restore state exists, all subtasks first commit the offsets they > receive (as a result of the restore state) to the external store, and then > all subtasks attempt to find a last offset for the partitions it is holding. > However, if the globally merged restore state feature mentioned by > [~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is > available, then the restore will be simple again, as each subtask has full > access to previous global state therefore coordination is not required. > I think changing to dynamic partition assignment is also good in the long run > for handling topic repartitioning. > Overall, > User-facing API changes: > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > DeserializationSchema, Properties) > - New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, > KeyedDeserializationSchema, Properties) > Implementation changes: > 1. Dynamic partition assigning depending on KafkaConsumer#subscribe > - Remove partition list querying from constructor > - Remove static partition assigning to substasks in run() > - Instead of using KafkaConsumer#assign() in fetchers to manually assign > static partitions, use subscribe() registered with the callback > implementation explained above. > 2. Restoring from checkpointed states > - Snapshotting should remain unchanged > - Restoring requires subtasks to coordinate the restored offsets they hold > before continuing (unless we are able to have merged restore states). > 3. For previous consumer functionality (consume from fixed list of topics), > the KafkaConsumer#subscribe() has a corresponding overload method for fixed > list of topics. We can simply decide which subscribe() overload to use > depending on whether a regex Pattern or list of topics is supplied. -- This message was sent by Atlassian JIRA (v6.3.4#6332)