Hi, Any news? It's maybe caused by an oversized akka payload (many akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 69074412 bytes in the log)
How do I set akka's maximum-payload-bytes in my flink cluster? https://issues.apache.org/jira/browse/FLINK-2373 is not clear about that. I do not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment(). Do I have to change the way I'm doing things ? How ? Thanks, Arnaud -----Message d'origine----- De : LINZ, Arnaud Envoyé : mercredi 30 novembre 2016 08:59 À : user@flink.apache.org Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error Hi, Don't think so. I always delete the ZK path before launching the batch (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears only before the collect() phase, not at the beginning. Full log is availlable here : https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r Thanks, Arnaud -----Message d'origine----- De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : mardi 29 novembre 2016 18:43 À : LINZ, Arnaud <al...@bouyguestelecom.fr>; user@flink.apache.org Objet : Re: Collect() freeze on yarn cluster on strange recover/deserialization error Hey Arnaud, could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the configured ZK root contains data. A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure. – Ufuk The complete JM logs will be helpful to further check what's happening there. On 29 November 2016 at 18:15:16, LINZ, Arnaud (al...@bouyguestelecom.fr) wrote: > Hello, > > I have a Flink 1.1.3 batch application that makes a simple aggregation > but freezes when > collect() is called when the app is deployed on a ha-enabled yarn > cluster (it works on a local cluster). > Just before it hangs, I have the following deserialization error in the logs : > > (...) > 2016-11-29 15:10:10,422 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > switched from DEPLOYING to RUNNING > 2016-11-29 15:10:13,175 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > (Key Remover) > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > FINISHED > 2016-11-29 15:10:17,816 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > (Key Remover) > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > FINISHED > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > Attempting to recover all jobs. > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal > error: > Failed to recover jobs. > java.io.StreamCorruptedException: invalid type code: 00 at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017 > ) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > getState(FileSerializableStateHandle.java:58) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > getState(FileSerializableStateHandle.java:35) > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.re > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManag > er.scala:530) at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > a:526) at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > a:526) at > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F > uture.scala:24) at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal > a:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr > actDispatcher.scala:401) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > java:1339) at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197 > 9) at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea > d.java:107) > > > Do you have an idea of what can be wrong? I have no problems with > other batch applications, just with this one. Why is it trying to recover the > jobs In the first place ? > Thanks, > Arnaud > > ________________________________ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses > pièces jointes. Toute utilisation ou diffusion non autorisée est > interdite. Si vous n'êtes pas destinataire de ce message, merci de le > détruire et d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. > The company that sent this message cannot therefore be held liable for > its content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. >