Dear entity that represents Flink user community,

In order to formulate the question itself, I would need to describe the
problem in many details, hence please bear with me for a while.

I have following execution graph:

KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy -> Storage
-> keyBy -> Classifier -> KafkaSink (This is slightly simplified version )

When I noticed less than ideal throughput, I executed profiler which
identified
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput)
as a major function (83% of time spent here). 45% of total time is spent in
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable).


The serialization is protobuf with Kryo, according to benchmarks it isn't
particularly slow , should be similar or a bit better than POJO.

The problem from my point of view is that serialization shouldn't happen at
all, unless data is actually sent via network to another node ( in my case
I have one job manager and one task manager ).

However, I would suspect that keyBy operation implicitly enforces usage of
serialization / deserialization.

First question : In this particular case, the key is exactly the same for
every keyBy, is there any other way than combining operations into a single
operator to avoid performance impact from keyBy chain ?

Second question : could I use the process function after keyBy in such
a way that it will not merge stream back e.g. it will continue to be
KeyedStream ?

Third question: could I somehow specify that the sequence of operators must
be executed in the same thread without serialization/deserialization
operations in between ?


Best regards,
Alexander

Reply via email to