Dear entity that represents Flink user community, In order to formulate the question itself, I would need to describe the problem in many details, hence please bear with me for a while.
I have following execution graph: KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy -> Storage -> keyBy -> Classifier -> KafkaSink (This is slightly simplified version ) When I noticed less than ideal throughput, I executed profiler which identified org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput) as a major function (83% of time spent here). 45% of total time is spent in org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable). The serialization is protobuf with Kryo, according to benchmarks it isn't particularly slow , should be similar or a bit better than POJO. The problem from my point of view is that serialization shouldn't happen at all, unless data is actually sent via network to another node ( in my case I have one job manager and one task manager ). However, I would suspect that keyBy operation implicitly enforces usage of serialization / deserialization. First question : In this particular case, the key is exactly the same for every keyBy, is there any other way than combining operations into a single operator to avoid performance impact from keyBy chain ? Second question : could I use the process function after keyBy in such a way that it will not merge stream back e.g. it will continue to be KeyedStream ? Third question: could I somehow specify that the sequence of operators must be executed in the same thread without serialization/deserialization operations in between ? Best regards, Alexander