I don't know why yet, but I did figure it out. After my sample long running map reduce test ran fine all night I tried a ton of things. Turns out there is a difference between env.execute() and env.collect().
My flow had reading from HDFS, decrypting, processing, and finally writing to HDFS, at each step though I was splitting the feed and counting stats for saving later. I was executing with collect on the stat feeds unioned together to bring them locally to determine the validity of my run before I did other things. Looks like collect() was causing the disconnections. When I switched to writing the stats out to HDFS files and calling env.execute() the flow works fine now. Oh and thank you for the retry suggestion, I turned it on and watched the job fail 3 times in a row with the same error. So the retry stuff works which is cool, and I'll use it from now on! (Btw, docs need updating here https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/fault_tolerance.html since that's stuffs deprecated!) Thank you all as always for being so responsive! On Fri, Jun 22, 2018 at 5:26 AM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Garrett, > > have you set a restart strategy for your job [1]? In order to recover from > failures you need to specify one. Otherwise Flink will terminally fail the > job in case of a failure. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/restart_strategies.html > > Cheers, > Till > > On Thu, Jun 21, 2018 at 7:43 PM Garrett Barton <garrett.bar...@gmail.com> > wrote: > >> Actually, random thought, could yarn preemption be causing this? What is >> the failure scenario should a working task manager go down in yarn that is >> doing real work? The docs make it sound like it should fire up another TM >> and get back to work out of the box, but I'm not seeing that. >> >> >> On Thu, Jun 21, 2018 at 1:20 PM Garrett Barton <garrett.bar...@gmail.com> >> wrote: >> >>> Thank you all for the reply! >>> >>> I am running batch jobs, I read in a handful of files from HDFS and >>> output to HBase, HDFS, and Kafka. I run into this when I have partial >>> usage of the cluster as the job runs. So right now I spin up 20 nodes with >>> 3 slots, my job at peak uses all 60 slots, but by the end of it since my >>> outputs are all forced parallel 1 while I work out kinks, that all >>> typically ends up running in 1 or two task managers tops. The other 18-19 >>> task managers die off. Problem is as soon as any task manager dies off, my >>> client throws the above exception and the job fails. >>> >>> I cannot share logs, but I was thinking about writing a dirt simple >>> mapreduce flow based on the wordcount example. The example would have a >>> wide map phase that generates data, and then I'd run it through a reducer >>> that sleeps maybe 1 second every record. I believe that will simulate my >>> condition very well where I go from 100% used slots to only 1-2 used slots >>> as I hit that timeout. I'll do that today and let you know, if it works I >>> can share the code in here as an example. >>> >>> On Thu, Jun 21, 2018 at 5:01 AM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Garrett, >>>> >>>> killing of idle TaskManager should not affect the execution of the job. >>>> By definition a TaskManager only idles if it does not execute any tasks. >>>> Could you maybe share the complete logs (of the cluster entrypoint and all >>>> TaskManagers) with us? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Jun 21, 2018 at 10:26 AM Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi Garrett, >>>>> >>>>> I agree, there seems to be an issue and increasing the timeout should >>>>> not be the right approach to solve it. >>>>> Are you running streaming or batch jobs, i.e., do some of the tasks >>>>> finish much earlier than others? >>>>> >>>>> I'm adding Till to this thread who's very familiar with scheduling and >>>>> process communication. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2018-06-19 0:03 GMT+02:00 Garrett Barton <garrett.bar...@gmail.com>: >>>>> >>>>>> Hey all, >>>>>> >>>>>> My jobs that I am trying to write in Flink 1.5 are failing after a >>>>>> few minutes. I think its because the idle task managers are shutting >>>>>> down, >>>>>> which seems to kill the client and the running job. The running job >>>>>> itself >>>>>> was still going on one of the other task managers. I get: >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: >>>>>> Connection unexpectedly closed by remote task manager 'xxxx'. This might >>>>>> indicate that the remote task manager was lost. >>>>>> at org.apache.flink.runtime.io >>>>>> .network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:143) >>>>>> >>>>>> Now I happen to have the last part of the flow paralleled to 1 right >>>>>> now for debugging, so the 4 task managers that are spun up, 3 of them hit >>>>>> the timeout period (currently set to 240000). I think as soon as the >>>>>> first >>>>>> one goes the client throws up and the whole job dies as a result. >>>>>> >>>>>> Is this expected behavior and if so, is there another way around it? >>>>>> Do I keep increasing the slotmanager.taskmanager-timeout to a really >>>>>> really >>>>>> large number? I have verified setting the timeout to 840000 lets the job >>>>>> complete without error. >>>>>> >>>>>> Thank you! >>>>>> >>>>> >>>>>