Thanks Micah! Logged https://issues.apache.org/jira/browse/CRUNCH-566
On Wed, Sep 30, 2015 at 3:35 PM Micah Whitacre <[email protected]> wrote: > Do you mind logging a bug for the multiple parallel pipelines and done > call? The done() should wait for all stages to complete before closing the > context and avoid that error. > > On Wed, Sep 30, 2015 at 3:33 PM, Nithin Asokan <[email protected]> > wrote: > >> Hey Micah, >> I tried the approaches you mentioned(gist updated >> <https://gist.github.com/nasokan/7a0820411656f618f182>). Both MRPipeline >> and SparkPipeline appears to submit and run parallel jobs. SparkPipeline >> supported it only when using *Pipeline#runAsync(), *also SparkPipeline >> fails when *Pipeline#done()* is used, removing it makes my Pipeline to >> work. >> >> Here is the error I see when I used >> >> pipeline.write(path1); >> pipeline.runAsync(); >> pipeline.write(path2); >> pipeline.runAsync(); >> pipeline.done(); >> >> Exception in thread "Thread-37" java.lang.NullPointerException >> at >> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103) >> at >> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at >> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) >> at >> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84) >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) >> at >> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) >> at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77) >> at >> org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878) >> at >> org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516) >> at >> org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51) >> at >> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55) >> at >> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44) >> at >> org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295) >> at >> org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80) >> at >> org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139) >> at java.lang.Thread.run(Thread.java:745) >> Exception in thread "Thread-38" java.lang.NullPointerException >> at >> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:103) >> at >> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeBlocks$1.apply(TorrentBroadcast.scala:102) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at >> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) >> at >> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84) >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) >> at >> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) >> at >> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) >> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) >> at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77) >> at >> org.apache.spark.SparkContext.newAPIHadoopRDD(SparkContext.scala:878) >> at >> org.apache.spark.api.java.JavaSparkContext.newAPIHadoopRDD(JavaSparkContext.scala:516) >> at >> org.apache.crunch.impl.spark.collect.InputCollection.getJavaRDDLike(InputCollection.java:51) >> at >> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLikeInternal(DoCollection.java:55) >> at >> org.apache.crunch.impl.spark.collect.DoCollection.getJavaRDDLike(DoCollection.java:44) >> at >> org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:295) >> at >> org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80) >> at >> org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139) >> at java.lang.Thread.run(Thread.java:745) >> >> However, I didn't see the same error when I have this code >> >> pipeline.write(path1); >> pipeline.runAsync(); >> pipeline.write(path2); >> pipeline.done(); >> >> Instead, when one of the job completes, it closes the SparkContext which >> will terminate any jobs that is currently active. >> >> [2015-09-30 15:18:55,835] [ERROR] [Thread-37] >> [org.apache.crunch.impl.spark.SparkRuntime] - Spark Exception >> org.apache.spark.SparkException: Job cancelled because SparkContext was >> shut down >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698) >> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) >> at >> org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411) >> at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81) >> at >> org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346) >> at org.apache.spark.SparkContext.stop(SparkContext.scala:1386) >> at >> org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:652) >> at >> org.apache.crunch.impl.spark.SparkPipeline.done(SparkPipeline.java:178) >> at com.test.ParallelAction.run(ParallelAction.java:47) >> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) >> at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84) >> at com.test.ParallelAction.main(ParallelAction.java:53) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> So it appears that we can still launch parallel jobs using *runAsync() * >> (no* done()* should be used), but I'm not sure if it's feasible for >> Crunch to determine independent SourceTarget's and launch them in parallel. >> It may make it a good feature :) >> >> Thanks >> Nithin >> >> >> >> On Wed, Sep 30, 2015 at 12:54 PM Micah Whitacre <[email protected]> >> wrote: >> >>> Try switching your test around a bit because I believe there are >>> instances even with MRPipeline where Crunch will kick off multiple jobs in >>> parallel. >>> >>> Something like the following: >>> >>> Read Input1 -> Filter -> Write Output1 >>> Read Input2 -> Filter -> Write Output2 >>> pipeline.done(); >>> >>> Try with the MRPipeline and then with Spark to see what is in parallel >>> vs what is serial. >>> >>> The other option is that is less ideal is that you could change your >>> code to be: >>> >>> Read Input1 -> Filter -> Write Output1 >>> pipeline.runAsync() >>> Read Input2 -> Filter -> Write Output2 >>> pipeline.runAsync() >>> pipeline.done(); >>> >>> This should kick them each off independently and give you the >>> parallelism. It would be nice however if you didn't have to do this >>> splitting but was done for you. >>> >>> >>> On Wed, Sep 30, 2015 at 12:41 PM, Nithin Asokan <[email protected]> >>> wrote: >>> >>>> I was reading about Spark scheduler[1], and this line caught my >>>> attention >>>> >>>> *Inside a given Spark application (SparkContext instance), multiple >>>> parallel jobs can run simultaneously if they were submitted from separate >>>> threads. By “job”, in this section, we mean a Spark action >>>> (e.g. save, collect) and any tasks that need to run to evaluate that >>>> action. Spark’s scheduler is fully thread-safe and supports this use case >>>> to enable applications that serve multiple requests (e.g. queries for >>>> multiple users).* >>>> >>>> If I understood the above statement, I think it is possible to have >>>> multiple jobs running parallel on a Spark application, as long as the >>>> *actions >>>> *are triggered by separate thread. >>>> >>>> I was trying to test this out on my Crunch Spark >>>> application(yarn-client) which reads two independent HDFS sources and >>>> perform *PCollection#getLenght() *on each source*. *The Spark WebUI >>>> starts with Job1 as submitted; after Job1 is completed Job2 is submitted >>>> and finished. I would like to get some thoughts on whether it is possible >>>> in Crunch to identify independent source/targets and possibly create >>>> separate threads that can interact with Spark scheduler? This way I think >>>> we can have some independent jobs running in parallel. >>>> >>>> Here is the example that I used >>>> https://gist.github.com/nasokan/7a0820411656f618f182 >>>> >>>> [1] >>>> https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application >>>> >>>> >>> >
