Hi, I am trying to get the frequency of each Unicode char in a document collection using Spark. Here is the code snippet that does the job:
JavaPairRDD<LongWritable, Text> rows = sc.sequenceFile(args[0], LongWritable.class, Text.class); rows = rows.coalesce(10000); JavaPairRDD<Character,Long> pairs = rows.flatMapToPair(t -> { String content=t._2.toString(); Multiset<Character> chars= HashMultiset.create(); for(int i=0;i<content.length();i++) chars.add(content.charAt(i)); List<Tuple2<Character,Long>> list=new ArrayList<Tuple2<Character, Long>>(); for(Character ch:chars.elementSet()){ list.add(new Tuple2<Character,Long>(ch,(long)chars.count(ch))); } return list; }); JavaPairRDD<Character, Long> counts = pairs.reduceByKey((a, b) -> a + b); System.out.printf("MapCount %,d\n",counts.count()); But, I get the following exception: 15/01/03 21:51:34 ERROR MapOutputTrackerMasterActor: Map output statuses were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes). org.apache.spark.SparkException: Map output statuses were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes). at org.apache.spark.MapOutputTrackerMasterActor$$anonfun$receiveWithLogging$1.applyOrElse(MapOutputTracker.scala:59) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.MapOutputTrackerMasterActor.aroundReceive(MapOutputTracker.scala:42) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Would you please tell me where is the fault? If I process fewer rows, there is no problem. However, when the number of rows is large I always get this exception. Thanks beforehand. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-limit-error-tp20955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org