Which version of Spark are you using? It seems like the issue here is that the map output statuses are too large to fit in the Akka frame size. This issue has been fixed in Spark 1.2 by using a different encoding for map outputs for jobs with many reducers ( https://issues.apache.org/jira/browse/SPARK-3613). On earlier Spark versions, your options are either reducing the number of reducers (e.g. by explicitly specifying the number of reducers in the reduceByKey() call) or increasing the Akka frame size (via the spark.akka.frameSize configuration option).
On Sat, Jan 3, 2015 at 10:40 AM, Saeed Shahrivari < saeed.shahriv...@gmail.com> wrote: > Hi, > > I am trying to get the frequency of each Unicode char in a document > collection using Spark. Here is the code snippet that does the job: > > JavaPairRDD<LongWritable, Text> rows = sc.sequenceFile(args[0], > LongWritable.class, Text.class); > rows = rows.coalesce(10000); > > JavaPairRDD<Character,Long> pairs = rows.flatMapToPair(t -> { > String content=t._2.toString(); > Multiset<Character> chars= HashMultiset.create(); > for(int i=0;i<content.length();i++) > chars.add(content.charAt(i)); > List<Tuple2<Character,Long>> list=new > ArrayList<Tuple2<Character, Long>>(); > for(Character ch:chars.elementSet()){ > list.add(new > Tuple2<Character,Long>(ch,(long)chars.count(ch))); > } > return list; > }); > > JavaPairRDD<Character, Long> counts = pairs.reduceByKey((a, b) -> a > + b); > System.out.printf("MapCount %,d\n",counts.count()); > > But, I get the following exception: > > 15/01/03 21:51:34 ERROR MapOutputTrackerMasterActor: Map output statuses > were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes). > org.apache.spark.SparkException: Map output statuses were 11141547 bytes > which exceeds spark.akka.frameSize (10485760 bytes). > at > > org.apache.spark.MapOutputTrackerMasterActor$$anonfun$receiveWithLogging$1.applyOrElse(MapOutputTracker.scala:59) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) > at > > org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > > org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > > org.apache.spark.MapOutputTrackerMasterActor.aroundReceive(MapOutputTracker.scala:42) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Would you please tell me where is the fault? > If I process fewer rows, there is no problem. However, when the number of > rows is large I always get this exception. > > Thanks beforehand. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-limit-error-tp20955.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >