Hello,

Due to a malformed message in our input queue (kafka), our DeserialisationSchema threw an exception, making our flink application crash. Since our application was configured to restart, it restarted, only to reprocess the same malformed message and crash again.

This happened for a while until we fixed the job and made the DeserialisationSchema return null on malformed messages. After that, we restarted the job from the last successful savepoint (just before the malformed message) and we got a huge number of duplicate messages generated from our processors (seems one message for every time flink restarted).

In addition to this, one of our map functions had side effects and these side effects were also executed a huge number of times. We basically have this topology:

kafka source --> stateful process function with event time timers --> (slow) map function with side effect (post to external API) --> kafka sink (at least once)

We don't use the exactly once sink (instead we use at least once), because an occasional duplicate would not harm us and we need low latency. However, having massive number of duplicates is a problem.

So i'm trying to understand how checkpoints+savepoints really work and in what situation we could end up having a massive amount of duplicates. The only way i could think of is the following scenario:

 * the application starts up
 * the stateful process function treats some incoming messages from
   kafka and generates some outgoing messages
 * the slow map function starts processing these messages, and at the
   same time a checkpoint is saved (somehow without new kafka offsets ???)
 * the application crashes on the malformed input

then again:

 * application restarts
 * the stateful process function treats again the same incoming
   messages from kafka, generating exactly the same in flight messages
   again (we use deterministic IDs for these messages and we see the
   same ID being generated over and over).
 * a checkpoint is saved with more in flight messages, the map function
   is slow hence doesn't catch up
 * the application crashes again on the same input.

Are in flight messages stored in a checkpoint somehow ? Is the above scenario even possible (reading the design of flink i would think no, but then i have no other explanation). We had this once more in the past (then it was a crash in another branch of the same dataflow).

Greetings,
Frank



Reply via email to