Re: Odd job failure

2018-05-29 Thread Piotr Nowojski
Hi,

Could you post full output of the mvn dependency:tree command on your project?
Can you reproduce this issue with some minimal project stripped down of any 
custom code/external dependencies except of Flink itself?

Thanks Piotrek

> On 28 May 2018, at 20:13, Elias Levy  wrote:
> 
> On Mon, May 28, 2018 at 1:48 AM, Piotr Nowojski  > wrote:
> Most likely suspect is the standard java problem of some dependency 
> convergence issue. Please check if you are not pulling in multiple Kafka 
> versions into your class path. Especially your job shouldn’t pull any Kafka 
> library except of the one that comes from flnk-connector-kafka-0.11 (which is 
> 0.11.0.2).
> 
> Alas, that is not the case.  The job correctly includes 
> kafka-clients:0.11.0.2 :
> 
> [warn] Found version conflict(s) in library dependencies; some are suspected 
> to be binary incompatible:
> [warn] 
> [warn]* org.apache.kafka:kafka-clients:0.11.0.2 is selected over 
> {0.10.2.1, 0.9.0.1}
> [warn]+- org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2 
> (depends on 0.11.0.2)
> [warn]+- org.apache.flink:flink-connector-kafka-0.9_2.11:1.4.2 
> (depends on 0.10.2.1)
> [warn]+- org.apache.flink:flink-connector-kafka-0.10_2.11:1.4.2 
> (depends on 0.10.2.1)
> [warn] 
> 
>  
> Please also consider upgrading your cluster at least to Kafka 0.11.0.2. Kafka 
> 0.11.0.0 was pretty unstable release, and we do not support it. Our connector 
> depend on Kafka 0.11.0.2 client and while I don’t assume that there is some 
> incompatibility between 0.11.0.0 cluster and 0.11.0.2 client, it definitely 
> wouldn’t hurt to upgrade the cluster.
> 
> Thanks for the tip.  That said, this error should be unrelated to the version 
> of the cluster.
> 



Re: Odd job failure

2018-05-28 Thread Elias Levy
On Mon, May 28, 2018 at 1:48 AM, Piotr Nowojski 
wrote:

> Most likely suspect is the standard java problem of some dependency
> convergence issue. Please check if you are not pulling in multiple Kafka
> versions into your class path. Especially your job shouldn’t pull any Kafka
> library except of the one that comes from flnk-connector-kafka-0.11 (which
> is 0.11.0.2).
>

Alas, that is not the case.  The job correctly includes kafka-clients:
0.11.0.2:

[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]
[warn]  * org.apache.kafka:kafka-clients:0.11.0.2 is selected over
{0.10.2.1, 0.9.0.1}
[warn]  +- org.apache.flink:flink-connector-kafka-0.11_2.11:1.4.2
(depends on 0.11.0.2)
[warn]  +- org.apache.flink:flink-connector-kafka-0.9_2.11:1.4.2
(depends on 0.10.2.1)
[warn]  +- org.apache.flink:flink-connector-kafka-0.10_2.11:1.4.2
(depends on 0.10.2.1)
[warn]




> Please also consider upgrading your cluster at least to Kafka 0.11.0.2.
> Kafka 0.11.0.0 was pretty unstable release, and we do not support it. Our
> connector depend on Kafka 0.11.0.2 client and while I don’t assume that
> there is some incompatibility between 0.11.0.0 cluster and 0.11.0.2 client,
> it definitely wouldn’t hurt to upgrade the cluster.
>

Thanks for the tip.  That said, this error should be unrelated to the
version of the cluster.


Re: Odd job failure

2018-05-28 Thread Piotr Nowojski
Hi,

I think that’s unlikely to happen. As far as I know, the only way to actually 
unload the classes in JVM is when their class loader is garbage collected, 
which means all the references in the code to it must vanish. In other words, 
it should never happen that class is not found while anyone is still 
referencing it.

Most likely suspect is the standard java problem of some dependency convergence 
issue. Please check if you are not pulling in multiple Kafka versions into your 
class path. Especially your job shouldn’t pull any Kafka library except of the 
one that comes from flnk-connector-kafka-0.11 (which is 0.11.0.2).

Please also consider upgrading your cluster at least to Kafka 0.11.0.2. Kafka 
0.11.0.0 was pretty unstable release, and we do not support it. Our connector 
depend on Kafka 0.11.0.2 client and while I don’t assume that there is some 
incompatibility between 0.11.0.0 cluster and 0.11.0.2 client, it definitely 
wouldn’t hurt to upgrade the cluster.

Piotrek

> On 26 May 2018, at 17:58, Elias Levy  wrote:
> 
> Piotr & Stephan,
> 
> Thanks for the replies.  Apologies for the late response.  I've been 
> traveling for the past month.
> 
> We've not observed this issue (spilling) again, but it is good to know that 
> 1.5 will use back-pressure based alignment.  I think for now we'll leave 
> task.checkpoint.alignment.max-size as is and work towards moving to 1.5 once 
> we confirm it is stable.
> 
> As for the java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 error.  We see that one constantly 
> when jobs are canceled/restarted/upgraded.  We are using the 
> flink-connector-kafka-0.11 connector against a 0.11.0.0 cluster.  The error 
> indicates to me that the Kafka threads are not being fully shutdown and they 
> are trying to reload the NetworkClient class but failing, maybe because the 
> code is no longer accessible via the class loader or some other reason.  
> 
> It looks like others are observing the same error.  Alexander Smirnov 
> reported it here on the list last month as well.
> 
> 
> On Thu, May 3, 2018 at 1:22 AM, Stephan Ewen  > wrote:
> Hi Elias!
> 
> Concerning the spilling of alignment data to disk:
> 
>   - In 1.4.x , you can set an upper limit via " 
> task.checkpoint.alignment.max-size ". See [1].
>   - In 1.5.x, the default is a back-pressure based alignment, which does not 
> spill any more.
> 
> Best,
> Stephan
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#task-checkpoint-alignment-max-size
>  
> 
> 
> On Wed, May 2, 2018 at 1:37 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> It might be some Kafka issue. 
> 
> From what you described your reasoning seems sound. For some reason TM3 fails 
> and is unable to restart and process any data, thus forcing spilling on 
> checkpoint barriers on TM1 and TM2.
> 
> I don’t know the reason behind java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to be 
> important in this case.
> 
> 1. What Kafka version are you using? Have you looked for any known Kafka 
> issues with those symptoms?
> 2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS 
> image? It might be some system issue.
> 
> Piotrek
> 
>> On 28 Apr 2018, at 01:54, Elias Levy > > wrote:
>> 
>> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd 
>> failure the other day.  It seems that it started as some sort of network 
>> event.  
>> 
>> It began with the 3rd TM starting to warn every 30 seconds about socket 
>> timeouts while sending metrics to DataDog.  This latest for the whole outage.
>> 
>> Twelve minutes later, all TMs reported at nearly the same time that they had 
>> marked the Kafka coordinator as deed ("Marking the coordinator XXX (id: 
>> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the 
>> system attempted to recover it.  Then things got into a weird state.
>> 
>> The following related for six or seven times for a period of about 40 
>> minutes: 
>> TM attempts to restart the job, but only the first and second TMs show signs 
>> of doing so.  
>> The disk begins to fill up on TMs 1 and 2.  
>> TMs 1 & 2 both report java.lang.NoClassDefFoundError: 
>> org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on 
>> this list earlier this month.  It is unclear if the are benign.
>> The job dies when the disks finally fills up on 1 and 2.
>> 
>> Looking at the backtrace logged when the disk fills up, I gather that Flink 
>> is buffering data coming from Kafka into one of my operators as a result of 
>> a barrier.  The job has a two input operator, with one input the 

Re: Odd job failure

2018-05-26 Thread Elias Levy
Piotr & Stephan,

Thanks for the replies.  Apologies for the late response.  I've been
traveling for the past month.

We've not observed this issue (spilling) again, but it is good to know that
1.5 will use back-pressure based alignment.  I think for now we'll leave
task.checkpoint.alignment.max-size as is and work towards moving to 1.5
once we confirm it is stable.

As for the java.lang.NoClassDefFoundError: org/apache/kafka/clients/
NetworkClient$1 error.  We see that one constantly when jobs are
canceled/restarted/upgraded.  We are using the flink-connector-kafka-0.11
connector against a 0.11.0.0 cluster.  The error indicates to me that the
Kafka threads are not being fully shutdown and they are trying to reload
the NetworkClient class but failing, maybe because the code is no longer
accessible via the class loader or some other reason.

It looks like others are observing the same error.  Alexander Smirnov
reported it here on the list last month as well.


On Thu, May 3, 2018 at 1:22 AM, Stephan Ewen  wrote:

> Hi Elias!
>
> Concerning the spilling of alignment data to disk:
>
>   - In 1.4.x , you can set an upper limit via "
> task.checkpoint.alignment.max-size ". See [1].
>   - In 1.5.x, the default is a back-pressure based alignment, which does
> not spill any more.
>
> Best,
> Stephan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/ops/config.html#task-checkpoint-alignment-max-size
>
> On Wed, May 2, 2018 at 1:37 PM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> It might be some Kafka issue.
>>
>> From what you described your reasoning seems sound. For some reason TM3
>> fails and is unable to restart and process any data, thus forcing spilling
>> on checkpoint barriers on TM1 and TM2.
>>
>> I don’t know the reason behind java.lang.NoClassDefFoundError:
>> org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to
>> be important in this case.
>>
>> 1. What Kafka version are you using? Have you looked for any known Kafka
>> issues with those symptoms?
>> 2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS
>> image? It might be some system issue.
>>
>> Piotrek
>>
>> On 28 Apr 2018, at 01:54, Elias Levy  wrote:
>>
>> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
>> failure the other day.  It seems that it started as some sort of network
>> event.
>>
>> It began with the 3rd TM starting to warn every 30 seconds about socket
>> timeouts while sending metrics to DataDog.  This latest for the whole
>> outage.
>>
>> Twelve minutes later, all TMs reported at nearly the same time that they
>> had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
>> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the
>> system attempted to recover it.  Then things got into a weird state.
>>
>> The following related for six or seven times for a period of about 40
>> minutes:
>>
>>1. TM attempts to restart the job, but only the first and second TMs
>>show signs of doing so.
>>2. The disk begins to fill up on TMs 1 and 2.
>>3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
>>org/apache/kafka/clients/NetworkClient$1 errors.  These were
>>mentioned on this list earlier this month.  It is unclear if the are 
>> benign.
>>4. The job dies when the disks finally fills up on 1 and 2.
>>
>>
>> Looking at the backtrace logged when the disk fills up, I gather that
>> Flink is buffering data coming from Kafka into one of my operators as a
>> result of a barrier.  The job has a two input operator, with one input the
>> primary data, and a secondary input for control commands.  It would appear
>> that for whatever reason the barrier for the control stream is not making
>> it to the operator, thus leading to the buffering and full disks.  Maybe
>> Flink scheduled the operator source of the control stream on the 3rd TM
>> which seems like it was not scheduling tasks?
>>
>> Finally the JM records that it 13 late messages for already expired
>> checkpoints (could they be from the 3rd TM?), the job is restored one more
>> time and it works.  All TMs report nearly at the same time that they can
>> now find the Kafka coordinator.
>>
>>
>> Seems like the 3rd TM has some connectivity issue, but then all TMs seems
>> to have a problem communicating with the Kafka coordinator at the same time
>> and recovered at the same time.  The TMs are hosted in AWS across AZs, so
>> all of them having connectivity issues at the same time is suspect.  The
>> Kafka node in question was up and other clients in our infrastructure seems
>> to be able to communicate with it without trouble.  Also, the Flink job
>> itself seemed to be talking to the Kafka cluster while restarting as it was
>> spilling data to disk coming from Kafka.  And the JM did not report any
>> reduction on available task slots, which would indicate connectivity issues
>> between 

Re: Odd job failure

2018-05-03 Thread Stephan Ewen
Concerning the connectivity issue - it is hard to say anything more without
any logs or details.

Does the JM log that it is trying to send tasks to the 3rd TM, but the TM
does not show signs of executing them?


On Thu, May 3, 2018 at 10:22 AM, Stephan Ewen  wrote:

> Hi Elias!
>
> Concerning the spilling of alignment data to disk:
>
>   - In 1.4.x , you can set an upper limit via "
> task.checkpoint.alignment.max-size ". See [1].
>   - In 1.5.x, the default is a back-pressure based alignment, which does
> not spill any more.
>
> Best,
> Stephan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/ops/config.html#task-checkpoint-alignment-max-size
>
> On Wed, May 2, 2018 at 1:37 PM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> It might be some Kafka issue.
>>
>> From what you described your reasoning seems sound. For some reason TM3
>> fails and is unable to restart and process any data, thus forcing spilling
>> on checkpoint barriers on TM1 and TM2.
>>
>> I don’t know the reason behind java.lang.NoClassDefFoundError:
>> org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to
>> be important in this case.
>>
>> 1. What Kafka version are you using? Have you looked for any known Kafka
>> issues with those symptoms?
>> 2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS
>> image? It might be some system issue.
>>
>> Piotrek
>>
>> On 28 Apr 2018, at 01:54, Elias Levy  wrote:
>>
>> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
>> failure the other day.  It seems that it started as some sort of network
>> event.
>>
>> It began with the 3rd TM starting to warn every 30 seconds about socket
>> timeouts while sending metrics to DataDog.  This latest for the whole
>> outage.
>>
>> Twelve minutes later, all TMs reported at nearly the same time that they
>> had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
>> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the
>> system attempted to recover it.  Then things got into a weird state.
>>
>> The following related for six or seven times for a period of about 40
>> minutes:
>>
>>1. TM attempts to restart the job, but only the first and second TMs
>>show signs of doing so.
>>2. The disk begins to fill up on TMs 1 and 2.
>>3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
>>org/apache/kafka/clients/NetworkClient$1 errors.  These were
>>mentioned on this list earlier this month.  It is unclear if the are 
>> benign.
>>4. The job dies when the disks finally fills up on 1 and 2.
>>
>>
>> Looking at the backtrace logged when the disk fills up, I gather that
>> Flink is buffering data coming from Kafka into one of my operators as a
>> result of a barrier.  The job has a two input operator, with one input the
>> primary data, and a secondary input for control commands.  It would appear
>> that for whatever reason the barrier for the control stream is not making
>> it to the operator, thus leading to the buffering and full disks.  Maybe
>> Flink scheduled the operator source of the control stream on the 3rd TM
>> which seems like it was not scheduling tasks?
>>
>> Finally the JM records that it 13 late messages for already expired
>> checkpoints (could they be from the 3rd TM?), the job is restored one more
>> time and it works.  All TMs report nearly at the same time that they can
>> now find the Kafka coordinator.
>>
>>
>> Seems like the 3rd TM has some connectivity issue, but then all TMs seems
>> to have a problem communicating with the Kafka coordinator at the same time
>> and recovered at the same time.  The TMs are hosted in AWS across AZs, so
>> all of them having connectivity issues at the same time is suspect.  The
>> Kafka node in question was up and other clients in our infrastructure seems
>> to be able to communicate with it without trouble.  Also, the Flink job
>> itself seemed to be talking to the Kafka cluster while restarting as it was
>> spilling data to disk coming from Kafka.  And the JM did not report any
>> reduction on available task slots, which would indicate connectivity issues
>> between the JM and the 3rd TM.  Yet, the logs in the 3rd TM do not show any
>> record of trying to restore the job during the intermediate attempts.
>>
>> What do folks make of it?
>>
>>
>> And a question for Flink devs, is there some reason why Flink does not
>> stop spilling messages to disk when the disk is going to fill up?  Seems
>> like there should be a configurable limit to how much data can be spilled
>> before back-pressure is applied to slow down or stop the source.
>>
>>
>>
>


Re: Odd job failure

2018-05-03 Thread Stephan Ewen
Hi Elias!

Concerning the spilling of alignment data to disk:

  - In 1.4.x , you can set an upper limit via "
task.checkpoint.alignment.max-size ". See [1].
  - In 1.5.x, the default is a back-pressure based alignment, which does
not spill any more.

Best,
Stephan

[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#task-checkpoint-alignment-max-size

On Wed, May 2, 2018 at 1:37 PM, Piotr Nowojski 
wrote:

> Hi,
>
> It might be some Kafka issue.
>
> From what you described your reasoning seems sound. For some reason TM3
> fails and is unable to restart and process any data, thus forcing spilling
> on checkpoint barriers on TM1 and TM2.
>
> I don’t know the reason behind java.lang.NoClassDefFoundError:
> org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to
> be important in this case.
>
> 1. What Kafka version are you using? Have you looked for any known Kafka
> issues with those symptoms?
> 2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS
> image? It might be some system issue.
>
> Piotrek
>
> On 28 Apr 2018, at 01:54, Elias Levy  wrote:
>
> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
> failure the other day.  It seems that it started as some sort of network
> event.
>
> It began with the 3rd TM starting to warn every 30 seconds about socket
> timeouts while sending metrics to DataDog.  This latest for the whole
> outage.
>
> Twelve minutes later, all TMs reported at nearly the same time that they
> had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the
> system attempted to recover it.  Then things got into a weird state.
>
> The following related for six or seven times for a period of about 40
> minutes:
>
>1. TM attempts to restart the job, but only the first and second TMs
>show signs of doing so.
>2. The disk begins to fill up on TMs 1 and 2.
>3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
>org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned
>on this list earlier this month.  It is unclear if the are benign.
>4. The job dies when the disks finally fills up on 1 and 2.
>
>
> Looking at the backtrace logged when the disk fills up, I gather that
> Flink is buffering data coming from Kafka into one of my operators as a
> result of a barrier.  The job has a two input operator, with one input the
> primary data, and a secondary input for control commands.  It would appear
> that for whatever reason the barrier for the control stream is not making
> it to the operator, thus leading to the buffering and full disks.  Maybe
> Flink scheduled the operator source of the control stream on the 3rd TM
> which seems like it was not scheduling tasks?
>
> Finally the JM records that it 13 late messages for already expired
> checkpoints (could they be from the 3rd TM?), the job is restored one more
> time and it works.  All TMs report nearly at the same time that they can
> now find the Kafka coordinator.
>
>
> Seems like the 3rd TM has some connectivity issue, but then all TMs seems
> to have a problem communicating with the Kafka coordinator at the same time
> and recovered at the same time.  The TMs are hosted in AWS across AZs, so
> all of them having connectivity issues at the same time is suspect.  The
> Kafka node in question was up and other clients in our infrastructure seems
> to be able to communicate with it without trouble.  Also, the Flink job
> itself seemed to be talking to the Kafka cluster while restarting as it was
> spilling data to disk coming from Kafka.  And the JM did not report any
> reduction on available task slots, which would indicate connectivity issues
> between the JM and the 3rd TM.  Yet, the logs in the 3rd TM do not show any
> record of trying to restore the job during the intermediate attempts.
>
> What do folks make of it?
>
>
> And a question for Flink devs, is there some reason why Flink does not
> stop spilling messages to disk when the disk is going to fill up?  Seems
> like there should be a configurable limit to how much data can be spilled
> before back-pressure is applied to slow down or stop the source.
>
>
>


Re: Odd job failure

2018-05-02 Thread Piotr Nowojski
Hi,

It might be some Kafka issue. 

From what you described your reasoning seems sound. For some reason TM3 fails 
and is unable to restart and process any data, thus forcing spilling on 
checkpoint barriers on TM1 and TM2.

I don’t know the reason behind java.lang.NoClassDefFoundError: 
org/apache/kafka/clients/NetworkClient$1 errors, but it doesn’t seem to be 
important in this case.

1. What Kafka version are you using? Have you looked for any known Kafka issues 
with those symptoms?
2. Maybe the easiest thing will be to reformat/reinstall/recreate TM3 AWS 
image? It might be some system issue.

Piotrek

> On 28 Apr 2018, at 01:54, Elias Levy  wrote:
> 
> We had a job on a Flink 1.4.2 cluster with three TMs experience an odd 
> failure the other day.  It seems that it started as some sort of network 
> event.  
> 
> It began with the 3rd TM starting to warn every 30 seconds about socket 
> timeouts while sending metrics to DataDog.  This latest for the whole outage.
> 
> Twelve minutes later, all TMs reported at nearly the same time that they had 
> marked the Kafka coordinator as deed ("Marking the coordinator XXX (id: 
> 2147482640 rack: null) dead for group ZZZ").  The job terminated and the 
> system attempted to recover it.  Then things got into a weird state.
> 
> The following related for six or seven times for a period of about 40 
> minutes: 
> TM attempts to restart the job, but only the first and second TMs show signs 
> of doing so.  
> The disk begins to fill up on TMs 1 and 2.  
> TMs 1 & 2 both report java.lang.NoClassDefFoundError: 
> org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on 
> this list earlier this month.  It is unclear if the are benign.
> The job dies when the disks finally fills up on 1 and 2.
> 
> Looking at the backtrace logged when the disk fills up, I gather that Flink 
> is buffering data coming from Kafka into one of my operators as a result of a 
> barrier.  The job has a two input operator, with one input the primary data, 
> and a secondary input for control commands.  It would appear that for 
> whatever reason the barrier for the control stream is not making it to the 
> operator, thus leading to the buffering and full disks.  Maybe Flink 
> scheduled the operator source of the control stream on the 3rd TM which seems 
> like it was not scheduling tasks?
> 
> Finally the JM records that it 13 late messages for already expired 
> checkpoints (could they be from the 3rd TM?), the job is restored one more 
> time and it works.  All TMs report nearly at the same time that they can now 
> find the Kafka coordinator.
> 
> 
> Seems like the 3rd TM has some connectivity issue, but then all TMs seems to 
> have a problem communicating with the Kafka coordinator at the same time and 
> recovered at the same time.  The TMs are hosted in AWS across AZs, so all of 
> them having connectivity issues at the same time is suspect.  The Kafka node 
> in question was up and other clients in our infrastructure seems to be able 
> to communicate with it without trouble.  Also, the Flink job itself seemed to 
> be talking to the Kafka cluster while restarting as it was spilling data to 
> disk coming from Kafka.  And the JM did not report any reduction on available 
> task slots, which would indicate connectivity issues between the JM and the 
> 3rd TM.  Yet, the logs in the 3rd TM do not show any record of trying to 
> restore the job during the intermediate attempts.
> 
> What do folks make of it?
> 
> 
> And a question for Flink devs, is there some reason why Flink does not stop 
> spilling messages to disk when the disk is going to fill up?  Seems like 
> there should be a configurable limit to how much data can be spilled before 
> back-pressure is applied to slow down or stop the source.



Odd job failure

2018-04-27 Thread Elias Levy
We had a job on a Flink 1.4.2 cluster with three TMs experience an odd
failure the other day.  It seems that it started as some sort of network
event.

It began with the 3rd TM starting to warn every 30 seconds about socket
timeouts while sending metrics to DataDog.  This latest for the whole
outage.

Twelve minutes later, all TMs reported at nearly the same time that they
had marked the Kafka coordinator as deed ("Marking the coordinator XXX (id:
2147482640 rack: null) dead for group ZZZ").  The job terminated and the
system attempted to recover it.  Then things got into a weird state.

The following related for six or seven times for a period of about 40
minutes:

   1. TM attempts to restart the job, but only the first and second TMs
   show signs of doing so.
   2. The disk begins to fill up on TMs 1 and 2.
   3. TMs 1 & 2 both report java.lang.NoClassDefFoundError:
   org/apache/kafka/clients/NetworkClient$1 errors.  These were mentioned on
   this list earlier this month.  It is unclear if the are benign.
   4. The job dies when the disks finally fills up on 1 and 2.


Looking at the backtrace logged when the disk fills up, I gather that Flink
is buffering data coming from Kafka into one of my operators as a result of
a barrier.  The job has a two input operator, with one input the primary
data, and a secondary input for control commands.  It would appear that for
whatever reason the barrier for the control stream is not making it to the
operator, thus leading to the buffering and full disks.  Maybe Flink
scheduled the operator source of the control stream on the 3rd TM which
seems like it was not scheduling tasks?

Finally the JM records that it 13 late messages for already expired
checkpoints (could they be from the 3rd TM?), the job is restored one more
time and it works.  All TMs report nearly at the same time that they can
now find the Kafka coordinator.


Seems like the 3rd TM has some connectivity issue, but then all TMs seems
to have a problem communicating with the Kafka coordinator at the same time
and recovered at the same time.  The TMs are hosted in AWS across AZs, so
all of them having connectivity issues at the same time is suspect.  The
Kafka node in question was up and other clients in our infrastructure seems
to be able to communicate with it without trouble.  Also, the Flink job
itself seemed to be talking to the Kafka cluster while restarting as it was
spilling data to disk coming from Kafka.  And the JM did not report any
reduction on available task slots, which would indicate connectivity issues
between the JM and the 3rd TM.  Yet, the logs in the 3rd TM do not show any
record of trying to restore the job during the intermediate attempts.

What do folks make of it?


And a question for Flink devs, is there some reason why Flink does not stop
spilling messages to disk when the disk is going to fill up?  Seems like
there should be a configurable limit to how much data can be spilled before
back-pressure is applied to slow down or stop the source.