Thank you very much !



[email protected]

From: Aaron Davidson
Date: 2013-12-25 02:48
To: user
Subject: Re: How to set Akka frame size
The error you're receiving is because the Akka frame size must be a positive 
Java Integer, i.e., less than 2^31. However, the frame size is not intended to 
be nearly the size of the job memory -- it is the smallest unit of data 
transfer that Spark does. In this case, your "task result" size is exceeding 
10MB, which means that returning the results for a single partition of your 
data is >10MB.


It appears that the default JavaWordCount example has a minSplits value of 1 
(ctx.textFile(args[1], 1)). This really means that the number of partitions 
will be max(1, # hdfs blocks in file). If you have an HDFS block of size ~64MB 
and all distinct words, the resulting task set may be around the same size, 
which is >10MB.


You have two collaborating solutions:
Increase the value of minSplits to reduce the size of any single TaskResult 
set, like: ctx.textFile(args[1], 256) 
Increase the Akka frame size by a small amount (e.g., to 20-70MB).
Please note that this issue, while annoying, is in good part due to the lack of 
realism of this example. You very rarely call collect() in Spark in actual 
usage, as that will put all your output data on the driver machine. Much more 
likely you'd save to an HDFS file or compute the top 100 words or something 
like that, which would not have this problem.


(One final note about your configuration, the Spark Worker is simply 
responsible for spawning Executors, which do the actual computation. As such, 
it is typical not to change the Worker memory at all [as it needs very little] 
but rather to give the majority of a machine's memory distributed amongst the 
Executors. If each machine has 16 GB of RAM and 4 cores, for example, you might 
set spark.executor.memory between 2 and 3 GB, totaling 8-12 GB used by Spark.)



On Tue, Dec 24, 2013 at 3:58 AM, [email protected] <[email protected]> 
wrote:

Hi, everyone

I have a question about the arg spark.akka.frameSize , it default value is 10m .
I execute the JavaWordCount read data from hdfs , there is a 7G file .
there is a oom error caused by 
some task result exceeded Akka frame size .
but when I modify the arg 1G ,2G , 10G , it show me 
ERROR ClusterScheduler: Lost executor 1 on ocnosql84: remote Akka client 
shutdown 
13/12/24 19:41:14 ERROR StandaloneExecutorBackend: Driver terminated or 
disconnected! Shutting down. 

Sometimes it show me different error info :
[lh1@ocnosql84 src]$ java MyWordCount spark://ocnosql84:7077 
hdfs://ocnosql76:8030/user/lh1/cdr_ismp_20130218 15000 1g 120
13/12/24 19:20:33 ERROR Client$ClientActor: Failed to connect to master
org.jboss.netty.channel.ChannelPipelineException: Failed to initialize a 
pipeline.
        at 
org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:209)
        at 
org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:183)
        at 
akka.remote.netty.ActiveRemoteClient$$anonfun$connect$1.apply$mcV$sp(Client.scala:173)
        at akka.util.Switch.liftedTree1$1(LockUtil.scala:33)
        at akka.util.Switch.transcend(LockUtil.scala:32)
        at akka.util.Switch.switchOn(LockUtil.scala:55)
        at akka.remote.netty.ActiveRemoteClient.connect(Client.scala:158)
        at 
akka.remote.netty.NettyRemoteTransport.send(NettyRemoteSupport.scala:153)
        at akka.remote.RemoteActorRef.$bang(RemoteActorRefProvider.scala:247)
        at 
org.apache.spark.deploy.client.Client$ClientActor.preStart(Client.scala:61)
        at akka.actor.ActorCell.create$1(ActorCell.scala:508)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:600)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:209)
        at akka.dispatch.Mailbox.run(Mailbox.scala:178)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
        at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
        at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
        at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
        at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.lang.IllegalArgumentException: maxFrameLength must be a 
positive integer: -1451229184
        at 
org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:270)
        at 
org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder.<init>(LengthFieldBasedFrameDecoder.java:236)
        at 
akka.remote.netty.ActiveRemoteClientPipelineFactory.getPipeline(Client.scala:340)
        at 
org.jboss.netty.bootstrap.ClientBootstrap.connect(ClientBootstrap.java:207)
        ... 18 more
13/12/24 19:20:33 ERROR SparkDeploySchedulerBackend: Disconnected from Spark 
cluster!
13/12/24 19:20:33 ERROR ClusterScheduler: Exiting due to error from cluster 
scheduler: Disconnected from Spark cluster

It seems caused by 
LengthFieldBasedFrameDecoder lenDec = new 
LengthFieldBasedFrameDecoder(this.client.netty().settings().MessageFrameSize(), 
0, 4, 0, 4);
I don't know what's the value of  
this.client.netty().settings().MessageFrameSize() and how  to calculate  this 
value .

my spark args :
export SPARK_DAEMON_MEMORY=4000m 
export SPARK_MEM=1000m 
export SPARK_WORKER_MEMORY=8g 
spark.akka.frameSize = 1000 / 2000 / 5000 / 10000 / 15000
spark.executor.memory  = 1g
spark.akka.askTimeout = 120

Any help or reply is very appriciated !  Thanks very much
    



[email protected]

Reply via email to