Hi all,

*tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
in consumer assignment for Flink tasks?*

At the Wikimedia Foundation we are evaluating
<https://phabricator.wikimedia.org/T307944> whether we can use a Kafka
'stretch' cluster to simplify the multi-datacenter deployment architecture
of streaming applications.

A Kafka stretch cluster is one in which the brokers span multiple
datacenters, relying on the usual Kafka broker replication for multi DC
replication (rather than something like Kafka MirrorMaker).  This is
feasible with Kafka today mostly because of follower fetching
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica>
support, allowing consumers to be assigned to consume from partitions that
are 'closest' to them, e.g. in the same 'rack' (or DC :) ).

Having a single Kafka cluster makes datacenter failover for streaming
applications a little bit simpler, as there is only one set of offsets to
use when saving state.  We can run a streaming app in active/passive mode.
This would allow us to stop the app in one datacenter, and then start it up
in another, using the same state snapshot and same Kafka cluster.

But, this got me wondering...would it be possible to run a streaming app in
an active/active mode, where in normal operation, half of the work was
being done in each DC, and in failover, all of the work would automatically
failover to the online DC.

I don't think running a single Flink application cross DC would work well;
there's too much inter-node traffic happening, and the Flink tasks don't
have any DC awareness.

But would it be possible to run two separate streaming applications in each
DC, but in the *same Kafka consumer group*? I believe that, if the
streaming app was using Kafka's usual consumer assignment and rebalancing
protocol, it would.  Kafka would just see clients connecting from either DC
in the same consumer group, and assign each consumer an equal number of
partitions to consume, resulting in equal partition balancing in DCs.  If
we shut down one of the streaming apps, Kafka would automatically rebalance
the consumers in the consumer group, assigning all of the work to the
remaining streaming app in the other DC.

I got excited about this possibility, only to learn that Flink's
KafkaSource does not use Kafka for consumer assignment.  I think I
understand why it does this: the Source API can do a lot more than Kafka,
so having some kind of state management (offsets) and task assignment
(Kafka consumer balance protocol) outside of the usual Flink Source would
be pretty weird.  Implementing offset and task assignment inside of the
KafkaSource allows it to work like any other Source implementation.

However, this active/active multi DC streaming app idea seems pretty
compelling, as it would greatly reduce operator/SRE overhead.  It seems to
me that any Kafka streaming app that did use Kafka's built in consumer
assignment protocol (like Kafka Streams) would be deployable in this way.
But in Flink this is not possible because of the way it assigns tasks.

I'm writing this email to see what others think about this, and wonder if
it might be possible to implement a KafkaSource that assigned tasks using
Kafka's usual consumer assignment protocol.  Hopefully someone more
knowledgeable about Sources and TaskSplits, etc. could advise here.

Thank you!

- Andrew Otto
  Wikimedia Foundation

Reply via email to