Hi, Thanks for the prompt reply. I checked the code. The main issue is the large number of mappers. If the number of mappers is set to some number around 1000, there will be no problem. I hope the bug gets fixed in the next releases.
On Mon, Jan 5, 2015 at 1:26 AM, Josh Rosen <rosenvi...@gmail.com> wrote: > Ah, so I guess this *is* still an issue since we needed to use a bitmap > for tracking zero-sized blocks (see > https://issues.apache.org/jira/browse/SPARK-3740; this isn't just a > performance issue; it's necessary for correctness). This will require a > bit more effort to fix, since we'll either have to find a way to use a > fixed size / capped size encoding for MapOutputStatuses (which might > require changes to let us fetch empty blocks safely) or figure out some > other strategy for shipping these statues. > > I've filed https://issues.apache.org/jira/browse/SPARK-5077 to try to > come up with a proper fix. In the meantime, I recommend that you increase > your Akka frame size. > > On Sat, Jan 3, 2015 at 8:51 PM, Saeed Shahrivari < > saeed.shahriv...@gmail.com> wrote: > >> I use the 1.2 version. >> >> On Sun, Jan 4, 2015 at 3:01 AM, Josh Rosen <rosenvi...@gmail.com> wrote: >> >>> Which version of Spark are you using? It seems like the issue here is >>> that the map output statuses are too large to fit in the Akka frame size. >>> This issue has been fixed in Spark 1.2 by using a different encoding for >>> map outputs for jobs with many reducers ( >>> https://issues.apache.org/jira/browse/SPARK-3613). On earlier Spark >>> versions, your options are either reducing the number of reducers (e.g. by >>> explicitly specifying the number of reducers in the reduceByKey() call) >>> or increasing the Akka frame size (via the spark.akka.frameSize >>> configuration option). >>> >>> On Sat, Jan 3, 2015 at 10:40 AM, Saeed Shahrivari < >>> saeed.shahriv...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am trying to get the frequency of each Unicode char in a document >>>> collection using Spark. Here is the code snippet that does the job: >>>> >>>> JavaPairRDD<LongWritable, Text> rows = sc.sequenceFile(args[0], >>>> LongWritable.class, Text.class); >>>> rows = rows.coalesce(10000); >>>> >>>> JavaPairRDD<Character,Long> pairs = rows.flatMapToPair(t -> { >>>> String content=t._2.toString(); >>>> Multiset<Character> chars= HashMultiset.create(); >>>> for(int i=0;i<content.length();i++) >>>> chars.add(content.charAt(i)); >>>> List<Tuple2<Character,Long>> list=new >>>> ArrayList<Tuple2<Character, Long>>(); >>>> for(Character ch:chars.elementSet()){ >>>> list.add(new >>>> Tuple2<Character,Long>(ch,(long)chars.count(ch))); >>>> } >>>> return list; >>>> }); >>>> >>>> JavaPairRDD<Character, Long> counts = pairs.reduceByKey((a, b) >>>> -> a >>>> + b); >>>> System.out.printf("MapCount %,d\n",counts.count()); >>>> >>>> But, I get the following exception: >>>> >>>> 15/01/03 21:51:34 ERROR MapOutputTrackerMasterActor: Map output statuses >>>> were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes). >>>> org.apache.spark.SparkException: Map output statuses were 11141547 bytes >>>> which exceeds spark.akka.frameSize (10485760 bytes). >>>> at >>>> >>>> org.apache.spark.MapOutputTrackerMasterActor$$anonfun$receiveWithLogging$1.applyOrElse(MapOutputTracker.scala:59) >>>> at >>>> >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>> at >>>> >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>> at >>>> >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>> at >>>> >>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53) >>>> at >>>> >>>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42) >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>> at >>>> >>>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>> at >>>> >>>> org.apache.spark.MapOutputTrackerMasterActor.aroundReceive(MapOutputTracker.scala:42) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> Would you please tell me where is the fault? >>>> If I process fewer rows, there is no problem. However, when the number >>>> of >>>> rows is large I always get this exception. >>>> >>>> Thanks beforehand. >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-limit-error-tp20955.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >