We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
failure the other day.  It seems that it started as some sort of network
event.

It began with the 3rd TM starting to warn every 30 seconds about socket
timeouts while sending metrics to DataDog.  This latest for the whole
outage.

Twelve minutes later, all TMs reported at nearly the same time that they
had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
2147482640 rack: null) dead for group ZZZ").  The job terminated and the
system attempted to recover it.  Then things got into a weird state.

The following related for six or seven times for a period of about 40
minutes:

   1. TM attempts to restart the job, but only the first and second TMs
   show signs of doing so.
   2. The disk begins to fill up on TMs 1 and 2.
   3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
   org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on
   this list earlier this month.  It is unclear if the are benign.
   4. The job dies when the disks finally fills up on 1 and 2.


Looking at the backtrace logged when the disk fills up, I gather that Flink
is buffering data coming from Kafka into one of my operators as a result of
a barrier.  The job has a two input operator, with one input the primary
data, and a secondary input for control commands.  It would appear that for
whatever reason the barrier for the control stream is not making it to the
operator, thus leading to the buffering and full disks.  Maybe Flink
scheduled the operator source of the control stream on the 3rd TM which
seems like it was not scheduling tasks?

Finally the JM records that it 13 late messages for already expired
checkpoints (could they be from the 3rd TM?), the job is restored one more
time and it works.  All TMs report nearly at the same time that they can
now find the Kafka coordinator.


Seems like the 3rd TM has some connectivity issue, but then all TMs seems
to have a problem communicating with the Kafka coordinator at the same time
and recovered at the same time.  The TMs are hosted in AWS across AZs, so
all of them having connectivity issues at the same time is suspect.  The
Kafka node in question was up and other clients in our infrastructure seems
to be able to communicate with it without trouble.  Also, the Flink job
itself seemed to be talking to the Kafka cluster while restarting as it was
spilling data to disk coming from Kafka.  And the JM did not report any
reduction on available task slots, which would indicate connectivity issues
between the JM and the 3rd TM.  Yet, the logs in the 3rd TM do not show any
record of trying to restore the job during the intermediate attempts.

What do folks make of it?


And a question for Flink devs, is there some reason why Flink does not stop
spilling messages to disk when the disk is going to fill up?  Seems like
there should be a configurable limit to how much data can be spilled before
back-pressure is applied to slow down or stop the source.

Reply via email to