Hello All, I'm running a simple word count example using the quickstart package from the Flink(0.10.1), on an input dataset of 500MB. This dataset is a set of randomly generated words of length 8.
Cluster Configuration: Number of machines: 7 Total cores : 25 Memory on each: 64GB I'm interested in the performance measure between Batch and Stream modes and so I'm running WordCount example with number of iteration (max 10) on datasets of sizes ranging between 100MB and 50GB consisting of random words of length 4 and 8. While I ran the experiments in Batch mode all iterations ran fine, but now I'm stuck in the Streaming mode at this Caused by: java.lang.OutOfMemoryError: Java heap space at java.util.HashMap.resize(HashMap.java:580) at java.util.HashMap.addEntry(HashMap.java:879) at java.util.HashMap.put(HashMap.java:505) at org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98) at org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) I investigated found 2 solutions. (1) Increasing the taskmanager.heap.mb and (2) Reducing the taskmanager.memory.fraction Therefore I set taskmanager.heap.mb: 1024 and taskmanager.memory.fraction: 0.5 (default 0.7) When I ran the example with this setting I loose taskmanagers one by one during the job execution with the following cause Caused by: java.lang.Exception: The slot in which the task was executed has been released. Probably loss of TaskManager 831a72dad6fbb533b193820f45bdc5bc @ vm-10-155-208-138 - 4 slots - URL: akka.tcp://flink@10.155.208.138:42222/user/taskmanager at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at akka.actor.ActorCell.invoke(ActorCell.scala:486) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ... 2 more While I look at the results generated at each taskmanager, they are fine. The logs also don't show any causes for the the job to get cancelled. Could anyone kindly guide me here? Kind Regards, Ravinder Kaur.