Hi all We have a pipeline (runs on YARN, Flink v1.7.1) which consumes a union of Kafka and HDFS sources. We remarked that the throughput is 10 times higher if only one of these sources is consumed. While trying to identify the problem I implemented a no-op source which was unioned with one of the real sources:
class NoOpSourceFunction extends ParallelSourceFunction[GenericRecord] { override def run(ctx: SourceContext[GenericRecord]): Unit = {} override def cancel(): Unit = {} } mainStream.union(env.addSource(new NoOpSourceFunction())) I remarked that whenever I use a union with any sources like above or union the stream with itself, I get the same performance hit. When I compare the job graph on the Flink UI the only difference is that in case of a union the two sources aren't chained to the subsequent downstream operators (transformation steps), both are connected to them with ship_strategy: FORWARD. When only one source is present, that one is chained to the transformation steps. To avoid union (and/or forward partitioning) I tried to connect streams with CoFlatMapFunction to get the same result but without any gain in performance. I was thinking about to read the HDFS stream parallel and use Iterate function to feed it back to a previous operator. After a couple of trial and error I'd like ask for your advice. What is the best practice here? Which options / tools are there to analyze the execution plan apart from the Flink plan visualizer and the provided web UI? Thanks Peter