Tzu-Li (Gordon) Tai created FLINK-4022:
------------------------------------------

             Summary: Partition discovery / regex topic subscription for the 
Kafka consumer
                 Key: FLINK-4022
                 URL: https://issues.apache.org/jira/browse/FLINK-4022
             Project: Flink
          Issue Type: New Feature
    Affects Versions: 1.0.0
            Reporter: Tzu-Li (Gordon) Tai
             Fix For: 1.1.0


Allow users to subscribe to "topic-n*", so that the consumer automatically 
reads from "topic-n1", "topic-n2", ... and so on as they are added to Kafka.

I propose to implement this feature by the following description:

Since the overall list of partitions to read will change after job submission, 
the main big change required for this feature will be dynamic partition 
assignment to subtasks while the Kafka consumer is running. This will mainly be 
accomplished using Kafka 0.9.x API 
`KafkaConsumer#subscribe(java.util.regex.Pattern, ConsumerRebalanceListener)`. 
Each KafkaConsumers in each subtask will be added to the same consumer group 
when instantiated, and rely on Kafka to dynamically reassign partitions to them 
whenever a rebalance happens. The registered `ConsumerRebalanceListener` is a 
callback that is called right before and after rebalancing happens. We'll use 
this callback to let each subtask commit its last offsets of partitions its 
currently responsible of to an external store (or Kafka) before a rebalance; 
after rebalance and the substasks gets the new partitions it'll be reading 
from, they'll read from the external store to get the last offsets for their 
new partitions (partitions which don't have offset entries in the store are new 
partitions causing the rebalancing).

The tricky part will be restoring Flink checkpoints when the partition 
assignment is dynamic. Snapshotting will remain the same - subtasks snapshot 
the offsets of partitions they are currently holding. Restoring will be  a bit 
different in that subtasks might not be assigned matching partitions to the 
snapshot the subtask is restored with (since we're letting Kafka dynamically 
assign partitions). There will need to be a coordination process where, if a 
restore state exists, all subtasks first commit the offsets they receive (as a 
result of the restore state) to the external store, and then all subtasks 
attempt to find a last offset for the partitions it is holding.

However, if the globally merged restore state feature mentioned by 
[~StephanEwen] in https://issues.apache.org/jira/browse/FLINK-3231 is 
available, then the restore will be simple again, as each subtask has full 
access to previous global state therefore coordination is not required.

I think changing to dynamic partition assignment is also good in the long run 
for handling topic repartitioning.

Overall,

User-facing API changes:

- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern, 
DeserializationSchema, Properties)
- New constructor - FlinkKafkaConsumer09(java.util.regex.Pattern,
KeyedDeserializationSchema, Properties)


Implementation changes:

1. Dynamic partition assigning depending on KafkaConsumer#subscribe
- Remove partition list querying from constructor
- Remove static partition assigning to substasks in run()
- Instead of using KafkaConsumer#assign() in fetchers to manually assign static 
partitions, use subscribe() registered with the callback implementation 
explained above.

2. Restoring from checkpointed states
- Snapshotting should remain unchanged
- Restoring requires subtasks to coordinate the restored offsets they hold 
before continuing (unless we are able to have merged restore states).

3. For previous consumer functionality (consume from fixed list of topics), the 
KafkaConsumer#subscribe() has a corresponding overload method for fixed list of 
topics. We can simply decide which subscribe() overload to use depending on 
whether a regex Pattern or list of topics is supplied.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to