Hi, 

I am running Flink batch job on Standalone Cluster (16 nodes), on top of 
Hadoop. 
The chain looks like:

DataSet1 = env.readTextFile (csv on hdfs)
.map
.flatMap
.groupBy
.reduce
.map
.writeAsCsv (DataSet 1)

DataSet2 = env.readTextFile 
.map
.flatMap

env.readCsvFile (DataSet1)
DataSet1.flatJoin(DataSet2)
.groupBy
.reduce
.filter
.count

The job finishes successfully with DataSource ~ 24.3 G 
As I scaled to a larger data  (244.3 G ) it fails   with: 

java.lang.Exception: TaskManager was lost/killed: 
3c0d5310e30f7c52eae95ff97bee85e2 @ grisou-46.nancy.grid5000.fr (dataPort=51359)
        at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Next job submit  after cluster restart produce this ERROR, I guess caused by 
TaskManager lost:
 


java.lang.IllegalStateException: Update task on TaskManager 
12c673eb2681eb4f1d1cb000e561f1c5 @ grisou-48.nancy.grid5000.fr (dataPort=42326) 
failed due to:
        at 
org.apache.flink.runtime.executiongraph.Execution$8.apply(Execution.java:1076)
        at 
org.apache.flink.runtime.executiongraph.Execution$8.apply(Execution.java:1073)
        at 
org.apache.flink.runtime.concurrent.impl.FlinkFuture$3.recover(FlinkFuture.java:201)
        at akka.dispatch.Recover.internal(Future.scala:268)
        at akka.dispatch.japi$RecoverBridge.apply(Future.scala:184)
        at akka.dispatch.japi$RecoverBridge.apply(Future.scala:182)
        at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
        at scala.util.Try$.apply(Try.scala:192)
        at scala.util.Failure.recover(Try.scala:216)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://fl...@grisou-48.nancy.grid5000.fr:49630/user/taskmanager#-1925734821]]
 after [10000 ms]
        at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
        at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
        at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
        at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:474)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:425)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:429)
        at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:381)
        ... 1 more 


Regarding web-monitor there are 0 finished tasks  after DataSource -> Map -> 
Map -> Map - > FlatMap ->Reduce Chain .  
Also looks like lost/killed Task Manager is not restarting.  

Thank you for any help  on this issue, 

Regards  
Oleksandra

Reply via email to