Hi,

I am trying to get the frequency of each Unicode char in a document
collection using Spark. Here is the code snippet that does the job:

        JavaPairRDD<LongWritable, Text> rows = sc.sequenceFile(args[0],
LongWritable.class, Text.class);
        rows = rows.coalesce(10000);

        JavaPairRDD<Character,Long> pairs = rows.flatMapToPair(t -> {           
 
            String content=t._2.toString();
            Multiset<Character> chars= HashMultiset.create();
            for(int i=0;i<content.length();i++)
                chars.add(content.charAt(i));
            List&lt;Tuple2&lt;Character,Long>> list=new
ArrayList<Tuple2&lt;Character, Long>>();
            for(Character ch:chars.elementSet()){
                list.add(new
Tuple2<Character,Long>(ch,(long)chars.count(ch)));
            }
            return list;
        });

        JavaPairRDD<Character, Long> counts = pairs.reduceByKey((a, b) -> a
+ b);
        System.out.printf("MapCount %,d\n",counts.count());

But, I get the following exception:

15/01/03 21:51:34 ERROR MapOutputTrackerMasterActor: Map output statuses
were 11141547 bytes which exceeds spark.akka.frameSize (10485760 bytes).
org.apache.spark.SparkException: Map output statuses were 11141547 bytes
which exceeds spark.akka.frameSize (10485760 bytes).
        at
org.apache.spark.MapOutputTrackerMasterActor$$anonfun$receiveWithLogging$1.applyOrElse(MapOutputTracker.scala:59)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.spark.MapOutputTrackerMasterActor.aroundReceive(MapOutputTracker.scala:42)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Would you please tell me where is the fault?
If I process fewer rows, there is no problem. However, when the number of
rows is large I always get this exception.

Thanks beforehand.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-limit-error-tp20955.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to