Hi I did some tests and it turns out I was really overloading the cluster which caused the problems. I tried the timeout setting but that didn't help. Simply 'not overloading' the system did help.
Thanks. Niels On Thu, Oct 12, 2017 at 10:42 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Niels, > > Flink currently restarts the complete job if you have a restart > strategy configured: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_ > strategies.html. > > I agree that only restarting the required parts of the pipeline is an > important optimization. Flink has not implemented this (fully) yet but > it's on the agenda [1] and work has already started [2]. > > In this particular case, everything is just slow and we don't need the > restart at all if you give the consumer a higher max timeout. > > Please report back when you have more info :-) > > – Ufuk > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 1+%3A+Fine+Grained+Recovery+from+Task+Failures > > [2] https://issues.apache.org/jira/browse/FLINK-4256 > > On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes <ni...@basjes.nl> wrote: > > Hi, > > > > I'm currently doing some tests to see it this info helps. > > I was running a different high CPU task on one of the nodes outside > Yarn, so > > I took that one out of the cluster to see if that helps. > > > > What I do find strange that in this kind of error scenario the entire job > > fails. > > I would have expected something similar as with 'good old' MapReduce: The > > missing task is simply resubmitted and ran again. > > Why doesn't that happen? > > > > > > Niels > > > > On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi <u...@apache.org> wrote: > >> > >> Hey Niels, > >> > >> any update on this? > >> > >> – Ufuk > >> > >> > >> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi <u...@apache.org> wrote: > >> > Hey Niels, > >> > > >> > thanks for the detailed report. I don't think that it is related to > >> > the Hadoop or Scala version. I think the following happens: > >> > > >> > - Occasionally, one of your tasks seems to be extremely slow in > >> > registering its produced intermediate result (the data shuffled > >> > between TaskManagers) > >> > - Another task is already requesting to consume data from this task > >> > but cannot find it (after multiple retries) and it fails the complete > >> > job (your stack trace) > >> > > >> > That happens only occasionally probably due to load in your cluster. > >> > The slow down could have multiple reasons... > >> > - Is your Hadoop cluster resource constrained and the tasks are slow > to > >> > deploy? > >> > - Is your application JAR very large and needs a lot of time > >> > downloading? > >> > > >> > We have two options at this point: > >> > 1) You can increase the maximum retries via the config option: > >> > "taskmanager.network.request-backoff.max" The default is 10000 > >> > (milliseconds) and specifies what the maximum request back off is [1]. > >> > Increasing this to 30000 would give you two extra retries with pretty > >> > long delays (see [1]). > >> > > >> > 2) To be sure that this is really what is happening we could increase > >> > the log level of certain classes and check whether they have > >> > registered their results or not. If you want to do this, I'm more than > >> > happy to provide you with some classes to enable DEBUG logging for. > >> > > >> > What do you think? > >> > > >> > – Ufuk > >> > > >> > DETAILS > >> > ======= > >> > > >> > - The TaskManagers produce and consume intermediate results > >> > - When a TaskManager wants to consume a result, it directly queries > >> > the producing TaskManager for it > >> > - An intermediate result becomes ready for consumption during initial > >> > task setup (state DEPLOYING) > >> > - When a TaskManager is slow to register its intermediate result and > >> > the consumer requests the result before it is ready, it can happen > >> > that a requested partition is "not found" > >> > > >> > This is what is also happening here. We retry to request the > >> > intermediate result multiple times with timed backoff [1] and only > >> > fail the request (your stack trace) if the partition is still not > >> > ready although we expect it to be ready (that is there was no failure > >> > at the producing task). > >> > > >> > [1] Starting by default at 100 millis and going up to 10_000 millis by > >> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 10000) > >> > > >> > > >> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes <ni...@basjes.nl> > wrote: > >> >> Hi, > >> >> > >> >> I'm having some trouble running a java based Flink job in a > >> >> yarn-session. > >> >> > >> >> The job itself consists of reading a set of files resulting in a > >> >> DataStream > >> >> (I use DataStream because in the future I intend to change the file > >> >> with a > >> >> Kafka feed), then does some parsing and eventually writes the data > into > >> >> HBase. > >> >> > >> >> Most of the time running this works fine yet sometimes it fails with > >> >> this > >> >> exception: > >> >> > >> >> > >> >> org.apache.flink.runtime.io.network.partition. > PartitionNotFoundException: > >> >> Partition > >> >> 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363 > >> >> not found. > >> >> at > >> >> > >> >> org.apache.flink.runtime.io.network.partition.consumer. > RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203) > >> >> at > >> >> > >> >> org.apache.flink.runtime.io.network.partition.consumer. > RemoteInputChannel.retriggerSubpartitionRequest( > RemoteInputChannel.java:128) > >> >> at > >> >> > >> >> org.apache.flink.runtime.io.network.partition.consumer. > SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:345) > >> >> at > >> >> > >> >> org.apache.flink.runtime.taskmanager.Task. > onPartitionStateUpdate(Task.java:1286) > >> >> at > >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1123) > >> >> at > >> >> org.apache.flink.runtime.taskmanager.Task$2.apply(Task.java:1118) > >> >> at > >> >> > >> >> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5. > onComplete(FlinkFuture.java:272) > >> >> at akka.dispatch.OnComplete.internal(Future.scala:248) > >> >> at akka.dispatch.OnComplete.internal(Future.scala:245) > >> >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) > >> >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) > >> >> at scala.concurrent.impl.CallbackRunnable.run(Promise. > scala:32) > >> >> at > >> >> > >> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch( > BatchingExecutor.scala:55) > >> >> at > >> >> > >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. > apply$mcV$sp(BatchingExecutor.scala:91) > >> >> at > >> >> > >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. > apply(BatchingExecutor.scala:91) > >> >> at > >> >> > >> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1. > apply(BatchingExecutor.scala:91) > >> >> at > >> >> scala.concurrent.BlockContext$.withBlockContext( > BlockContext.scala:72) > >> >> at > >> >> > >> >> akka.dispatch.BatchingExecutor$BlockableBatch.run( > BatchingExecutor.scala:90) > >> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala: > 40) > >> >> at > >> >> > >> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:397) > >> >> at > >> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> >> at > >> >> > >> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > >> >> at > >> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > >> >> at > >> >> > >> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > >> >> > >> >> I went through all logs at the Hadoop side of all the related > >> >> containers and > >> >> other than this exception I did not see any warning/error that might > >> >> explain > >> >> what is going on here. > >> >> > >> >> Now the "Most of the time running this works fine" makes this hard to > >> >> troubleshoot. When I run the same job again it may run perfectly that > >> >> time. > >> >> > >> >> I'm using flink-1.3.2-bin-hadoop27-scala_2.11.tgz and I double > checked > >> >> my > >> >> pom.xml and I use the same version for Flink / Scala in there. > >> >> > >> >> The command used to start the yarn-session on my experimental cluster > >> >> (no > >> >> security, no other users): > >> >> > >> >> /usr/local/flink-1.3.2/bin/yarn-session.sh \ > >> >> --container 180 \ > >> >> --name "Flink on Yarn Experiments" \ > >> >> --slots 1 \ > >> >> --jobManagerMemory 4000 \ > >> >> --taskManagerMemory 4000 \ > >> >> --streaming \ > >> >> --detached > >> >> > >> >> Two relevant fragments from my application pom.xml: > >> >> > >> >> <flink.version>1.3.2</flink.version> > >> >> <flink.scala.version>2.11</flink.scala.version> > >> >> > >> >> > >> >> > >> >> <dependency> > >> >> <groupId>org.apache.flink</groupId> > >> >> <artifactId>flink-java</artifactId> > >> >> <version>${flink.version}</version> > >> >> </dependency> > >> >> > >> >> <dependency> > >> >> <groupId>org.apache.flink</groupId> > >> >> <artifactId>flink-streaming-java_${flink.scala.version}</ > artifactId> > >> >> <version>${flink.version}</version> > >> >> </dependency> > >> >> > >> >> <dependency> > >> >> <groupId>org.apache.flink</groupId> > >> >> <artifactId>flink-clients_${flink.scala.version}</artifactId> > >> >> <version>${flink.version}</version> > >> >> </dependency> > >> >> > >> >> <dependency> > >> >> <groupId>org.apache.flink</groupId> > >> >> <artifactId>flink-hbase_${flink.scala.version}</artifactId> > >> >> <version>${flink.version}</version> > >> >> </dependency> > >> >> > >> >> > >> >> I could really use some suggestions where to look for the root cause > of > >> >> this. > >> >> Is this something in my application? My Hadoop cluster? Or is this a > >> >> problem > >> >> in Flink 1.3.2? > >> >> > >> >> Thanks. > >> >> > >> >> -- > >> >> Best regards / Met vriendelijke groeten, > >> >> > >> >> Niels Basjes > > > > > > > > > > -- > > Best regards / Met vriendelijke groeten, > > > > Niels Basjes > -- Best regards / Met vriendelijke groeten, Niels Basjes