Hello,
Small heads up, we found the cause. It had nothing to do with flink, but
it was a bug in our own code.
We used CEP to detect when senders would stop send messages (basically
we used the timeout of a CEP pattern). And when generating these timeout
messages we made inconsistent use of the different timestamps resulting
in out of order messages, which were then mistreated in a subsequent
step ...
Frank
On 09.02.22 17:03, Frank Dekervel wrote:
Hello,
Due to a malformed message in our input queue (kafka), our
DeserialisationSchema threw an exception, making our flink application
crash. Since our application was configured to restart, it restarted,
only to reprocess the same malformed message and crash again.
This happened for a while until we fixed the job and made the
DeserialisationSchema return null on malformed messages. After that,
we restarted the job from the last successful savepoint (just before
the malformed message) and we got a huge number of duplicate messages
generated from our processors (seems one message for every time flink
restarted).
In addition to this, one of our map functions had side effects and
these side effects were also executed a huge number of times. We
basically have this topology:
kafka source --> stateful process function with event time timers -->
(slow) map function with side effect (post to external API) --> kafka
sink (at least once)
We don't use the exactly once sink (instead we use at least once),
because an occasional duplicate would not harm us and we need low
latency. However, having massive number of duplicates is a problem.
So i'm trying to understand how checkpoints+savepoints really work and
in what situation we could end up having a massive amount of
duplicates. The only way i could think of is the following scenario:
* the application starts up
* the stateful process function treats some incoming messages from
kafka and generates some outgoing messages
* the slow map function starts processing these messages, and at the
same time a checkpoint is saved (somehow without new kafka offsets
???)
* the application crashes on the malformed input
then again:
* application restarts
* the stateful process function treats again the same incoming
messages from kafka, generating exactly the same in flight
messages again (we use deterministic IDs for these messages and we
see the same ID being generated over and over).
* a checkpoint is saved with more in flight messages, the map
function is slow hence doesn't catch up
* the application crashes again on the same input.
Are in flight messages stored in a checkpoint somehow ? Is the above
scenario even possible (reading the design of flink i would think no,
but then i have no other explanation). We had this once more in the
past (then it was a crash in another branch of the same dataflow).
Greetings,
Frank