Hi, you could also try increasing the heartbeat timeout via `akka.watch.heartbeat.pause`. Maybe this helps to overcome the GC pauses.
Cheers, Till On Wed, Nov 29, 2017 at 12:41 PM, T Obi <t....@geniee.co.jp> wrote: > Warnings of Datanode appeared not in all cases of timeout. They seem > to be raised just by timeout while snapshotting. > > We output GC logs on taskmanagers and found that someone kicks > System.gc() every an hour. > So a full GC runs every an hour, and it takes about a minute or more > in our cases... > When a taskmanager is timed out, the full GC seems to be always > running on it. The full GC is not only by System.gc() but also "Full > GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though. > > Some of our jobs have a large state. I think because of this the full > GC takes long time. > I try to make a few taskmanagers run with divided memory size on each > machine. > Also I will tune JVM memory parameters to reduce the frequency of > "Full GC (Metadata GC Threshold)". > > Best, > Tetsuya > > > 2017-11-28 16:30 GMT+09:00 T Obi <t....@geniee.co.jp>: > > Hello Chesnay, > > > > Thank you for answer to my rough question. > > > > Not all of taskmanagers are quarantined at a time, but each > > taskmanager has been quarantined at least once. > > > > We are using CDH 5.8 based on hadoop 2.6. > > We didn't give attention about datanodes. We will check it. > > However, we are also using the HDFS for MapReduce and it seems to work > fine. > > > > I searched archives of this mailing list with keyword "Detected > > unreachable" and found out mails about trouble on GC. > > Though we are using G1GC, we try to output GC log. > > > > > > Best, > > Tetsuya > > > > 2017-11-28 1:15 GMT+09:00 Chesnay Schepler <ches...@apache.org>: > >> Are only some taskmanagers quarantined, or all of them? > >> > >> Do the quarantined taskmanagers have anything in common? > >> (are the failing ones always on certain machines; do the stacktraces > >> reference the same hdfs datanodes) > >> > >> Which hadoop version are you using? > >> > >> From the stack-trace it appears that multiple hdfs nodes are being > >> corrupted. > >> The taskmanagers timeout since the connection to zookeeper breaks down, > >> at which point it no longer knows who the leading jobmanager knows and > >> subsequently shuts down. > >> > >> > >> On 27.11.2017 08:02, T Obi wrote: > >>> > >>> Hello all, > >>> > >>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing > >>> a problem. Suddenly a connection between a taskmanager and the > >>> jobmanager is timed out and the taskmanager is "quarantined" by > >>> jobmanager. > >>> Once a taskmanager is quarantined, of course jobs are restarted, but > >>> the timeout and quarantine happens to some taskmanager successively. > >>> > >>> When a taskmanager's connection to jobmanager was timed out, its > >>> connections to zookeeper and snapshot HDFS were also timed out. So the > >>> problem doesn't seem to be one of Flink itself. > >>> But though a taskmanager which runs on the same machine as jobmanager > >>> is timed out, jobmanager is alright at the time. So I think it is not > >>> OS problem too. > >>> > >>> Could you give us some advice on how to investigate? Thank you. > >>> > >>> > >>> > >>> Taskmanager command line: > >>> > >>> java -XX:+UseG1GC -Xms219136M -Xmx219136M > >>> -XX:MaxDirectMemorySize=8388607T > >>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager- > 0-flink-jp-2.log > >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/ > log4j.properties > >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/ > conf/logback.xml > >>> -classpath > >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/ > opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3. > 2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/ > flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/ > flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar::: > >>> org.apache.flink.runtime.taskmanager.TaskManager --configDir > >>> /opt/flink/flink-1.3.2/conf > >>> > >>> > >>> Taskmanager (on flink-jp-2) log: > >>> > >>> 2017-11-22 14:09:31,595 INFO > >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap > >>> backend snapshot (File Stream Factory @ > >>> > >>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/ > 9469db324b834e9dcf5b46428b3ae011, > >>> synchronous part) in thread > >>> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000), > >>> > >>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable. > BuyerReporterV2Auction$$anon$12$$anon$7@d2619591, > >>> > >>> reduceFunction=org.apache.flink.streaming.api.scala.function.util. > ScalaReduceFunction@72bca894}, > >>> ProcessingTimeTrigger(), > >>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map > >>> (9/30),5,Flink Task Threads] took 142 ms. > >>> 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient > >>> - DFSOutputStream ResponseProcessor exception > >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_ > 620518999 > >>> java.io.EOFException: Premature EOF: no length prefix available > >>> at > >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java: > 2207) > >>> at > >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields( > PipelineAck.java:176) > >>> at > >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ > ResponseProcessor.run(DFSOutputStream.java:867) > >>> 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient > >>> - DFSOutputStream ResponseProcessor exception > >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_ > 621053744 > >>> java.io.EOFException: Premature EOF: no length prefix available > >>> at > >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java: > 2207) > >>> at > >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields( > PipelineAck.java:176) > >>> at > >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ > ResponseProcessor.run(DFSOutputStream.java:867) > >>> 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient > >>> - DFSOutputStream ResponseProcessor exception > >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_ > 620520092 > >>> java.io.EOFException: Premature EOF: no length prefix available > >>> at > >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java: > 2207) > >>> at > >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields( > PipelineAck.java:176) > >>> at > >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ > ResponseProcessor.run(DFSOutputStream.java:867) > >>> 2017-11-22 14:12:10,028 WARN org.apache.hadoop.hdfs.DFSClient > >>> - DFSOutputStream ResponseProcessor exception > >>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_ > 620517393 > >>> java.io.EOFException: Premature EOF: no length prefix available > >>> at > >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java: > 2207) > >>> at > >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields( > PipelineAck.java:176) > >>> at > >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ > ResponseProcessor.run(DFSOutputStream.java:867) > >>> 2017-11-22 14:12:10,041 WARN org.apache.hadoop.hdfs.DFSClient > >>> - Error Recovery for block > >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in > >>> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad > >>> datanode 10.5.0.61:50010 > >>> 2017-11-22 14:12:10,039 WARN org.apache.hadoop.hdfs.DFSClient > >>> - Error Recovery for block > >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in > >>> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad > >>> datanode 10.5.0.59:50010 > >>> 2017-11-22 14:12:10,038 WARN org.apache.hadoop.hdfs.DFSClient > >>> - Error Recovery for block > >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in > >>> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode > >>> 10.5.0.52:50010 > >>> 2017-11-22 14:12:10,029 INFO org.apache.zookeeper.ClientCnxn > >>> - Client session timed out, have not heard from > >>> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket > >>> connection and attempting reconnect > >>> 2017-11-22 14:12:10,057 WARN org.apache.hadoop.hdfs.DFSClient > >>> - Error Recovery for block > >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in > >>> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad > >>> datanode 10.5.0.69:50010 > >>> 2017-11-22 14:12:10,113 WARN akka.remote.RemoteWatcher > >>> - Detected unreachable: > >>> [akka.tcp://flink@flink-jp-2:43139] > >>> 2017-11-22 14:12:10,142 INFO > >>> > >>> org.apache.flink.shaded.org.apache.curator.framework. > state.ConnectionStateManager > >>> - State change: SUSPENDED > >>> 2017-11-22 14:12:10,142 WARN > >>> org.apache.flink.runtime.leaderretrieval. > ZooKeeperLeaderRetrievalService > >>> - Connection to ZooKeeper suspended. Can no longer retrieve the > >>> leader from ZooKeeper. > >>> 2017-11-22 14:12:10,157 INFO > >>> org.apache.flink.runtime.taskmanager.TaskManager - > >>> TaskManager akka://flink/user/taskmanager disconnects from JobManager > >>> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no > >>> longer reachable > >>> 2017-11-22 14:12:10,158 INFO > >>> org.apache.flink.runtime.taskmanager.TaskManager - > >>> Cancelling all computations and discarding all cached data. > >>> > >>> > >>> > >>> Jobmanager command line: > >>> > >>> java -Xms8192m -Xmx8192m > >>> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager- > 0-flink-jp-2.log > >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/ > log4j.properties > >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/ > conf/logback.xml > >>> -classpath > >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/ > opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3. > 2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/ > flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/ > flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar::: > >>> org.apache.flink.runtime.jobmanager.JobManager --configDir > >>> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2 > >>> --webui-port 8081 > >>> > >>> > >>> Jobmanager (on flink-jp-2) log: > >>> > >>> 2017-11-22 14:09:32,252 INFO > >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - > >>> Completed checkpoint 293 (125180549 bytes in 889 > >>> ms). > >>> 2017-11-22 14:12:02,705 WARN akka.remote.RemoteWatcher > >>> - Detected unreachable: > >>> [akka.tcp://flink@flink-jp-2:42609] > >>> 2017-11-22 14:12:02,705 INFO > >>> org.apache.flink.runtime.jobmanager.JobManager - Task > >>> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated. > >>> 2017-11-22 14:12:02,705 INFO > >>> org.apache.flink.runtime.executiongraph.ExecutionGraph - > >>> Source: lamp-auction-test -> Flat Map -> Map -> Sink: > >>> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched > >>> from RUNNING to FAILED. > >>> java.lang.Exception: TaskManager was lost/killed: > >>> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930) > >>> at > >>> org.apache.flink.runtime.instance.SimpleSlot. > releaseSlot(SimpleSlot.java:217) > >>> at > >>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment. > releaseSharedSlot(SlotSharingGroupAssignment.java:533) > >>> at > >>> org.apache.flink.runtime.instance.SharedSlot. > releaseSlot(SharedSlot.java:192) > >>> at > >>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) > >>> at > >>> org.apache.flink.runtime.instance.InstanceManager. > unregisterTaskManager(InstanceManager.java:212) > >>> at > >>> org.apache.flink.runtime.jobmanager.JobManager.org$ > apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated( > JobManager.scala:1228) > >>> at > >>> org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131) > >>> at > >>> scala.runtime.AbstractPartialFunction.apply( > AbstractPartialFunction.scala:36) > >>> at > >>> org.apache.flink.runtime.LeaderSessionMessageFilter$$ > anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) > >>> at > >>> scala.runtime.AbstractPartialFunction.apply( > AbstractPartialFunction.scala:36) > >>> at > >>> org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:33) > >>> at > >>> org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:28) > >>> at > >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > >>> at > >>> org.apache.flink.runtime.LogMessages$$anon$1. > applyOrElse(LogMessages.scala:28) > >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > >>> at > >>> org.apache.flink.runtime.jobmanager.JobManager. > aroundReceive(JobManager.scala:125) > >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > >>> at > >>> akka.actor.dungeon.DeathWatch$class.receivedTerminated( > DeathWatch.scala:44) > >>> at akka.actor.ActorCell.receivedTerminated(ActorCell. > scala:369) > >>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell. > scala:501) > >>> at akka.actor.ActorCell.invoke(ActorCell.scala:486) > >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > >>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) > >>> at > >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:397) > >>> at > >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>> at > >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > >>> at > >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > >>> at > >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > >>> > >>> > >>> > >>> Best, > >>> Tetsuya > >>> > >> >