Hey,
I have a flink job which has a default parallelism set to 2. I want to key the stream and then apply some flatMap on the keyed stream. The flatMap operation is quiet costly, so I want to have a much higher parallelism here (lets say 16). Additionally, it is important that the flatMap operation is executed for the same key always in the same process or in the same task.

I have the following code:

----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).print()
----

This works fine, and the "ExpensiveOperation" is executed always on the same tasks for the same keys.

Now I tried two things:

1.
----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).setParallelism(16).flatMap(new ExpensiveOperation()).print()
----
This fails with an exception because I can't set the parallelism on the keyBy operator.

2.
-----
env.setParallelism(2)
val input: DataStream[Event] = /* from somewhere */
input.keyBy(_.eventType).flatMap(new ExpensiveOperation()).setParallelism(16).print()
-----
While this executes, it breaks the assignment of the keys to the tasks: The "ExpensiveOperation" is now not executed on the same nodes anymore all the time (visible by the prefixes in the print()).

What am I doing wrong? Is the only chance to set the whole parallelism of the whole flink job to 16?

Thanks, have nice holidays,
Dominik

Reply via email to