Hi Andrew and Martijn, Thanks for looping me in, this is an interesting discussion! I'm trying to solve a higher level problem about Kafka topic routing/assignment with FLIP-246. The main idea is that there can exist an external service that can provide the coordination between Kafka and Flink to dynamically change clusters and topics, but we don't get into the partition granularity.
I'm going to try to answer some of your questions, let me know if I have missed something: 1. > I don't think running a single Flink application cross DC would work well; > there's too much inter-node traffic happening, and the Flink tasks don't > have any DC awareness. I agree. Running a single Flink application cross DC tends to increase costs significantly with public cloud providers in which you accrue cross DC costs. You can follow [1] to allow the KafkaSource to fully utilize Kafka rack awareness. 2. > But, this got me wondering...would it be possible to run a streaming app > in an active/active mode, where in normal operation, half of the work was > being done in each DC, and in failover, all of the work would automatically > failover to the online DC. > But would it be possible to run two separate streaming applications in > each DC, but in the *same Kafka consumer group*? I believe that, if the > streaming app was using Kafka's usual consumer assignment and rebalancing > protocol, it would. Kafka would just see clients connecting from either DC > in the same consumer group, and assign each consumer an equal number of > partitions to consume, resulting in equal partition balancing in DCs. If > we shut down one of the streaming apps, Kafka would automatically rebalance > the consumers in the consumer group, assigning all of the work to the > remaining streaming app in the other DC. Your investigation is correct, the KafkaSource doesn't support the Kafka's usual consumer assignment and rebalancing protocol. Technically, you can already do this with the PartitionSetSubscriber [2] and split up available partitions between different Flink jobs. A challenge here is that adding partitions would be difficult--you can no longer use KafkaSource's dynamic partition discovery--changes will require Flink job restart. I think this would be overkill to manage partitions like this. Another challenge you will face is balancing the traffic across DC--you can compute metrics (throughput of a Flink job's assigned Kafka partitions) to determine which DC to deploy the Flink job in to keep the cluster balanced. 3. > I got excited about this possibility, only to learn that Flink's > KafkaSource does not use Kafka for consumer assignment. I think I > understand why it does this: the Source API can do a lot more than Kafka, > so having some kind of state management (offsets) and task assignment > (Kafka consumer balance protocol) outside of the usual Flink Source would > be pretty weird. Implementing offset and task assignment inside of the > KafkaSource allows it to work like any other Source implementation. > This is totally correct. The source API can also pause and resume Kafka consumers for watermark alignment, but you shouldn't face this issue since your applications don't join. All that being said, it is possible to adapt this in the FLIP-246 design. Another benefit of FLIP-246 is to do this coordination without full Flink job restart--the design could potentially be extended to detect changes in DC assignment (if we need to do this dynamically) and reconcile the Flink jobs without restart, but again this is at the cluster/topic granularity. This is the FLIP doc [4] and discussion thread [5] if you want to leave feedback. I hope this helps and please let me know if you have any other questions! Best, Mason [1] https://issues.apache.org/jira/browse/FLINK-29398 [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/PartitionSetSubscriber.java [3] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#dynamic-partition-discovery [4] https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source [5] https://lists.apache.org/thread/vz7nw5qzvmxwnpktnofc9p13s1dzqm6z On Wed, Oct 5, 2022 at 8:42 AM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Andrew, > > While definitely no expert on this topic, my first thought was if this > idea could be solved with the idea that was proposed in FLIP-246 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source > > I'm also looping in Mason Chen who was the initiator of that FLIP :) > > Best regards, > > Martijn > > On Wed, Oct 5, 2022 at 10:00 AM Andrew Otto <o...@wikimedia.org> wrote: > >> (Ah, note that I am considering very simple streaming apps here, e.g. >> event enrichment apps. No windowing or complex state. The only state is >> the Kafka offsets, which I suppose would also have to be managed from >> Kafka, not from Flink state.) >> >> >> >> On Wed, Oct 5, 2022 at 9:54 AM Andrew Otto <o...@wikimedia.org> wrote: >> >>> Hi all, >>> >>> *tl;dr: Would it be possible to make a KafkaSource that uses Kafka's >>> built in consumer assignment for Flink tasks?* >>> >>> At the Wikimedia Foundation we are evaluating >>> <https://phabricator.wikimedia.org/T307944> whether we can use a Kafka >>> 'stretch' cluster to simplify the multi-datacenter deployment architecture >>> of streaming applications. >>> >>> A Kafka stretch cluster is one in which the brokers span multiple >>> datacenters, relying on the usual Kafka broker replication for multi DC >>> replication (rather than something like Kafka MirrorMaker). This is >>> feasible with Kafka today mostly because of follower fetching >>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica> >>> support, allowing consumers to be assigned to consume from partitions that >>> are 'closest' to them, e.g. in the same 'rack' (or DC :) ). >>> >>> Having a single Kafka cluster makes datacenter failover for streaming >>> applications a little bit simpler, as there is only one set of offsets to >>> use when saving state. We can run a streaming app in active/passive mode. >>> This would allow us to stop the app in one datacenter, and then start it up >>> in another, using the same state snapshot and same Kafka cluster. >>> >>> But, this got me wondering...would it be possible to run a streaming app >>> in an active/active mode, where in normal operation, half of the work was >>> being done in each DC, and in failover, all of the work would automatically >>> failover to the online DC. >>> >>> I don't think running a single Flink application cross DC would work >>> well; there's too much inter-node traffic happening, and the Flink tasks >>> don't have any DC awareness. >>> >>> But would it be possible to run two separate streaming applications in >>> each DC, but in the *same Kafka consumer group*? I believe that, if the >>> streaming app was using Kafka's usual consumer assignment and rebalancing >>> protocol, it would. Kafka would just see clients connecting from either DC >>> in the same consumer group, and assign each consumer an equal number of >>> partitions to consume, resulting in equal partition balancing in DCs. If >>> we shut down one of the streaming apps, Kafka would automatically rebalance >>> the consumers in the consumer group, assigning all of the work to the >>> remaining streaming app in the other DC. >>> >>> I got excited about this possibility, only to learn that Flink's >>> KafkaSource does not use Kafka for consumer assignment. I think I >>> understand why it does this: the Source API can do a lot more than Kafka, >>> so having some kind of state management (offsets) and task assignment >>> (Kafka consumer balance protocol) outside of the usual Flink Source would >>> be pretty weird. Implementing offset and task assignment inside of the >>> KafkaSource allows it to work like any other Source implementation. >>> >>> However, this active/active multi DC streaming app idea seems pretty >>> compelling, as it would greatly reduce operator/SRE overhead. It seems to >>> me that any Kafka streaming app that did use Kafka's built in consumer >>> assignment protocol (like Kafka Streams) would be deployable in this way. >>> But in Flink this is not possible because of the way it assigns tasks. >>> >>> I'm writing this email to see what others think about this, and wonder >>> if it might be possible to implement a KafkaSource that assigned tasks >>> using Kafka's usual consumer assignment protocol. Hopefully someone more >>> knowledgeable about Sources and TaskSplits, etc. could advise here. >>> >>> Thank you! >>> >>> - Andrew Otto >>> Wikimedia Foundation >>> >>> >>>