Hello,
Due to a malformed message in our input queue (kafka), our
DeserialisationSchema threw an exception, making our flink application
crash. Since our application was configured to restart, it restarted,
only to reprocess the same malformed message and crash again.
This happened for a while until we fixed the job and made the
DeserialisationSchema return null on malformed messages. After that, we
restarted the job from the last successful savepoint (just before the
malformed message) and we got a huge number of duplicate messages
generated from our processors (seems one message for every time flink
restarted).
In addition to this, one of our map functions had side effects and these
side effects were also executed a huge number of times. We basically
have this topology:
kafka source --> stateful process function with event time timers -->
(slow) map function with side effect (post to external API) --> kafka
sink (at least once)
We don't use the exactly once sink (instead we use at least once),
because an occasional duplicate would not harm us and we need low
latency. However, having massive number of duplicates is a problem.
So i'm trying to understand how checkpoints+savepoints really work and
in what situation we could end up having a massive amount of duplicates.
The only way i could think of is the following scenario:
* the application starts up
* the stateful process function treats some incoming messages from
kafka and generates some outgoing messages
* the slow map function starts processing these messages, and at the
same time a checkpoint is saved (somehow without new kafka offsets ???)
* the application crashes on the malformed input
then again:
* application restarts
* the stateful process function treats again the same incoming
messages from kafka, generating exactly the same in flight messages
again (we use deterministic IDs for these messages and we see the
same ID being generated over and over).
* a checkpoint is saved with more in flight messages, the map function
is slow hence doesn't catch up
* the application crashes again on the same input.
Are in flight messages stored in a checkpoint somehow ? Is the above
scenario even possible (reading the design of flink i would think no,
but then i have no other explanation). We had this once more in the past
(then it was a crash in another branch of the same dataflow).
Greetings,
Frank