I don't know why yet, but I did figure it out.  After my sample long
running map reduce test ran fine all night I tried a ton of things.  Turns
out there is a difference between env.execute() and env.collect().

My flow had reading from HDFS, decrypting, processing, and finally writing
to HDFS, at each step though I was splitting the feed and counting stats
for saving later.  I was executing with collect on the stat feeds unioned
together to bring them locally to determine the validity of my run before I
did other things.  Looks like collect() was causing the disconnections.
When I switched to writing the stats out to HDFS files and calling
env.execute() the flow works fine now.

Oh and thank you for the retry suggestion, I turned it on and watched the
job fail 3 times in a row with the same error.  So the retry stuff works
which is cool, and I'll use it from now on! (Btw, docs need updating here
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/fault_tolerance.html
since that's stuffs deprecated!)


Thank you all as always for being so responsive!

On Fri, Jun 22, 2018 at 5:26 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Garrett,
>
> have you set a restart strategy for your job [1]? In order to recover from
> failures you need to specify one. Otherwise Flink will terminally fail the
> job in case of a failure.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/restart_strategies.html
>
> Cheers,
> Till
>
> On Thu, Jun 21, 2018 at 7:43 PM Garrett Barton <garrett.bar...@gmail.com>
> wrote:
>
>> Actually, random thought, could yarn preemption be causing this?  What is
>> the failure scenario should a working task manager go down in yarn that is
>> doing real work?  The docs make it sound like it should fire up another TM
>> and get back to work out of the box, but I'm not seeing that.
>>
>>
>> On Thu, Jun 21, 2018 at 1:20 PM Garrett Barton <garrett.bar...@gmail.com>
>> wrote:
>>
>>> Thank you all for the reply!
>>>
>>> I am running batch jobs, I read in a handful of files from HDFS and
>>> output to HBase, HDFS, and Kafka.  I run into this when I have partial
>>> usage of the cluster as the job runs.  So right now I spin up 20 nodes with
>>> 3 slots, my job at peak uses all 60 slots, but by the end of it since my
>>> outputs are all forced parallel 1 while I work out kinks, that all
>>> typically ends up running in 1 or two task managers tops.  The other 18-19
>>> task managers die off.  Problem is as soon as any task manager dies off, my
>>> client throws the above exception and the job fails.
>>>
>>> I cannot share logs, but I was thinking about writing a dirt simple
>>> mapreduce flow based on the wordcount example.  The example would have a
>>> wide map phase that generates data, and then I'd run it through a reducer
>>> that sleeps maybe 1 second every record.  I believe that will simulate my
>>> condition very well where I go from 100% used slots to only 1-2 used slots
>>> as I hit that timeout.  I'll do that today and let you know, if it works I
>>> can share the code in here as an example.
>>>
>>> On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Garrett,
>>>>
>>>> killing of idle TaskManager should not affect the execution of the job.
>>>> By definition a TaskManager only idles if it does not execute any tasks.
>>>> Could you maybe share the complete logs (of the cluster entrypoint and all
>>>> TaskManagers) with us?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Garrett,
>>>>>
>>>>> I agree, there seems to be an issue and increasing the timeout should
>>>>> not be the right approach to solve it.
>>>>> Are you running streaming or batch jobs, i.e., do some of the tasks
>>>>> finish much earlier than others?
>>>>>
>>>>> I'm adding Till to this thread who's very familiar with scheduling and
>>>>> process communication.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-06-19 0:03 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>>  My jobs that I am trying to write in Flink 1.5 are failing after a
>>>>>> few minutes.  I think its because the idle task managers are shutting 
>>>>>> down,
>>>>>> which seems to kill the client and the running job. The running job 
>>>>>> itself
>>>>>> was still going on one of the other task managers.  I get:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>>>>> Connection unexpectedly closed by remote task manager 'xxxx'. This might
>>>>>> indicate that the remote task manager was lost.
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143)
>>>>>>
>>>>>> Now I happen to have the last part of the flow paralleled to 1 right
>>>>>> now for debugging, so the 4 task managers that are spun up, 3 of them hit
>>>>>> the timeout period (currently set to 240000).  I think as soon as the 
>>>>>> first
>>>>>> one goes the client throws up and the whole job dies as a result.
>>>>>>
>>>>>>  Is this expected behavior and if so, is there another way around it?
>>>>>> Do I keep increasing the slotmanager.taskmanager-timeout to a really 
>>>>>> really
>>>>>> large number? I have verified setting the timeout to 840000 lets the job
>>>>>> complete without error.
>>>>>>
>>>>>> Thank you!
>>>>>>
>>>>>
>>>>>

Reply via email to