Hi all

We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of
Kafka and
HDFS sources. We remarked that the throughput is 10 times higher if only
one of these sources is consumed.  While trying to identify the problem I
implemented a no-op source which was unioned with one of the real sources:

  class NoOpSourceFunction extends ParallelSourceFunction[GenericRecord] {

    override def run(ctx: SourceContext[GenericRecord]): Unit = {}
    override def cancel(): Unit = {}
  }

  mainStream.union(env.addSource(new NoOpSourceFunction()))

I remarked that whenever I use a union with any sources like above or union
the stream with itself, I get the same performance hit.
When I compare the job graph on the Flink UI the only difference is that in
case of a union the two sources aren't chained to the subsequent downstream
operators (transformation steps), both are connected to them with
ship_strategy: FORWARD.
When only one source is present, that one is chained to the transformation
steps.

To avoid union (and/or forward partitioning) I tried to connect streams
with CoFlatMapFunction to get the same result but without any gain in
performance. I was thinking about to read the HDFS stream parallel and use
Iterate function to feed it back to a previous operator.

After a couple of trial and error I'd like ask for your advice. What is the
best practice here?  Which options / tools are there to analyze the
execution plan apart from the Flink plan visualizer and the provided web UI?

Thanks
Peter

Reply via email to