I still can not reproduce it with 2 nodes (4 CPUs).

Your repro.py could be faster (10 min) than before (22 min):

inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
pc==3).collect()

(also, no cache needed anymore)

Davies



On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser <kras...@gmail.com> wrote:
> The issue has been sensitive to the number of executors and input data size.
> I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory
> overhead for YARN. This will fit onto Amazon r3 instance types.
> -Sven
>
> On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu <dav...@databricks.com> wrote:
>>
>> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
>> reproduce your failure. Should I test it with big memory node?
>>
>> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser <kras...@gmail.com> wrote:
>> > Thanks for the input! I've managed to come up with a repro of the error
>> > with
>> > test data only (and without any of the custom code in the original
>> > script),
>> > please see here:
>> > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
>> >
>> > The Gist contains a data generator and the script reproducing the error
>> > (plus driver and executor logs). If I run using full cluster capacity
>> > (32
>> > executors with 28GB), there are no issues. If I run on only two, the
>> > error
>> > appears again and the job fails:
>> >
>> > org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>> > List([B@294b55b7)
>> >
>> >
>> > Any thoughts or any obvious problems you can spot by any chance?
>> >
>> > Thank you!
>> > -Sven
>> >
>> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen <rosenvi...@gmail.com> wrote:
>> >>
>> >> It doesn’t seem like there’s a whole lot of clues to go on here without
>> >> seeing the job code.  The original "org.apache.spark.SparkException:
>> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that
>> >> maybe
>> >> there’s an issue with PySpark’s serialization / tracking of types, but
>> >> it’s
>> >> hard to say from this error trace alone.
>> >>
>> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
>> >> wrote:
>> >>
>> >> Hey Josh,
>> >>
>> >> I am still trying to prune this to a minimal example, but it has been
>> >> tricky since scale seems to be a factor. The job runs over ~720GB of
>> >> data
>> >> (the cluster's total RAM is around ~900GB, split across 32 executors).
>> >> I've
>> >> managed to run it over a vastly smaller data set without issues.
>> >> Curiously,
>> >> when I run it over slightly smaller data set of ~230GB (using
>> >> sort-based
>> >> shuffle), my job also fails, but I see no shuffle errors in the
>> >> executor
>> >> logs. All I see is the error below from the driver (this is also what
>> >> the
>> >> driver prints when erroring out on the large data set, but I assumed
>> >> the
>> >> executor errors to be the root cause).
>> >>
>> >> Any idea on where to look in the interim for more hints? I'll continue
>> >> to
>> >> try to get to a minimal repro.
>> >>
>> >> 2014-12-30 21:35:34,539 INFO
>> >> [sparkDriver-akka.actor.default-dispatcher-14]
>> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked
>> >> to
>> >> send map output locations for shuffle 0 to
>> >> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739
>> >> 2014-12-30 21:35:39,512 INFO
>> >> [sparkDriver-akka.actor.default-dispatcher-17]
>> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked
>> >> to
>> >> send map output locations for shuffle 0 to
>> >> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277
>> >> 2014-12-30 21:35:58,893 WARN
>> >> [sparkDriver-akka.actor.default-dispatcher-16]
>> >> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71))
>> >> -
>> >> Association with remote system
>> >>
>> >> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] 
>> >> has
>> >> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>> >> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor]
>> >> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn
>> >> application has already exited with state FINISHED!
>> >> 2014-12-30 21:35:59,056 INFO  [Yarn application state monitor]
>> >> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped
>> >> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>> >>
>> >> [...]
>> >>
>> >> 2014-12-30 21:35:59,111 INFO  [Yarn application state monitor]
>> >> ui.SparkUI
>> >> (Logging.scala:logInfo(59)) - Stopped Spark web UI at
>> >> http://ip-10-20-80-37.us-west-1.compute.internal:4040
>> >> 2014-12-30 21:35:59,130 INFO  [Yarn application state monitor]
>> >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping
>> >> DAGScheduler
>> >> 2014-12-30 21:35:59,131 INFO  [Yarn application state monitor]
>> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) -
>> >> Shutting
>> >> down all executors
>> >> 2014-12-30 21:35:59,132 INFO
>> >> [sparkDriver-akka.actor.default-dispatcher-14]
>> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking
>> >> each
>> >> executor to shut down
>> >> 2014-12-30 21:35:59,132 INFO  [Thread-2] scheduler.DAGScheduler
>> >> (Logging.scala:logInfo(59)) - Job 1 failed: collect at
>> >> /home/hadoop/test_scripts/test.py:63, took 980.751936 s
>> >> Traceback (most recent call last):
>> >>   File "/home/hadoop/test_scripts/test.py", line 63, in <module>
>> >>     result = j.collect()
>> >>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect
>> >>     bytesInJava = self._jrdd.collect().iterator()
>> >>   File
>> >>
>> >> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> >> line 538, in __call__
>> >>   File
>> >> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>> >> line
>> >> 300, in get_return_value
>> >> py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO  [Yarn
>> >> application
>> >> state monitor] cluster.YarnClientSchedulerBackend
>> >> (Logging.scala:logInfo(59)) - Stopped
>> >> : An error occurred while calling o117.collect.
>> >> : org.apache.spark.SparkException: Job cancelled because SparkContext
>> >> was
>> >> shut down
>> >>         at
>> >>
>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
>> >>         at
>> >>
>> >> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
>> >>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>> >>         at
>> >>
>> >> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
>> >>         at
>> >>
>> >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
>> >>         at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> >>         at
>> >>
>> >> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
>> >>         at
>> >>
>> >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> >>         at
>> >>
>> >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> >>         at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> >>         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> >>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> >>         at
>> >> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
>> >>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> >>         at
>> >>
>> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>> >>         at
>> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>         at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>         at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>         at
>> >>
>> >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>
>> >>
>> >> Thank you!
>> >> -Sven
>> >>
>> >>
>> >> On Tue, Dec 30, 2014 at 12:15 PM, Josh Rosen <rosenvi...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hi Sven,
>> >>>
>> >>> Do you have a small example program that you can share which will
>> >>> allow
>> >>> me to reproduce this issue?  If you have a workload that runs into
>> >>> this, you
>> >>> should be able to keep iteratively simplifying the job and reducing
>> >>> the data
>> >>> set size until you hit a fairly minimal reproduction (assuming the
>> >>> issue is
>> >>> deterministic, which it sounds like it is).
>> >>>
>> >>> On Tue, Dec 30, 2014 at 9:49 AM, Sven Krasser <kras...@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Hey all,
>> >>>>
>> >>>> Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1
>> >>>> fails
>> >>>> during shuffle. I've tried reverting from the sort-based shuffle back
>> >>>> to the
>> >>>> hash one, and that fails as well. Does anyone see similar problems or
>> >>>> has an
>> >>>> idea on where to look next?
>> >>>>
>> >>>> For the sort-based shuffle I get a bunch of exception like this in
>> >>>> the
>> >>>> executor logs:
>> >>>>
>> >>>> 2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2]
>> >>>> executor.Executor (Logging.scala:logError(96)) - Exception in task
>> >>>> 4523.0 in
>> >>>> stage 1.0 (TID 4524)
>> >>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>> >>>> List([B@130dc7ad)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305)
>> >>>>         at
>> >>>> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> >>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> >>>>         at
>> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> >>>>         at
>> >>>>
>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>>>         at
>> >>>>
>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>>>         at java.lang.Thread.run(Thread.java:745)
>> >>>>
>> >>>>
>> >>>> For the hash-based shuffle, there are now a bunch of these exceptions
>> >>>> in
>> >>>> the logs:
>> >>>>
>> >>>>
>> >>>> 2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0]
>> >>>> executor.Executor (Logging.scala:logError(96)) - Exception in task
>> >>>> 4479.0 in
>> >>>> stage 1.0 (TID 4480)
>> >>>> java.io.FileNotFoundException:
>> >>>>
>> >>>> /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0
>> >>>> (No such file or directory)
>> >>>>         at java.io.FileOutputStream.open(Native Method)
>> >>>>         at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
>> >>>>         at
>> >>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >>>>         at
>> >>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> >>>>         at
>> >>>>
>> >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> >>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> >>>>         at
>> >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> >>>>         at
>> >>>>
>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>>>         at
>> >>>>
>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>>>         at java.lang.Thread.run(Thread.java:745)
>> >>>>
>> >>>>
>> >>>> Thank you!
>> >>>> -Sven
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> http://sites.google.com/site/krasser/?utm_source=sig
>> >>>
>> >>>
>> >>
>> >>
>> >>
>> >> --
>> >> http://sites.google.com/site/krasser/?utm_source=sig
>> >
>> >
>> >
>> >
>> > --
>> > http://sites.google.com/site/krasser/?utm_source=sig
>
>
>
>
> --
> http://sites.google.com/site/krasser/?utm_source=sig

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to