GitHub user YanTangZhai opened a pull request:

    https://github.com/apache/spark/pull/1548

    [SPARK-2647] DAGScheduler plugs other JobSubmitted events when processing 
one JobSubmitted event

    If a few of jobs are submitted, DAGScheduler plugs other JobSubmitted 
events when processing one JobSubmitted event.
    For example ont JobSubmitted event is processed as follows and costs much 
time
    "spark-akka.actor.default-dispatcher-67" daemon prio=10 
tid=0x00007f75ec001000 nid=0x7dd6 in Object.wait() [0x00007f76063e1000]
    java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:503)
    at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130)
    locked <0x0000000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call)
    at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241)
    at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
    at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at 
org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83)
    at 
org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60)
    at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
    at 
org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472)
    at 
org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498)
    at 
org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208)
    at 
org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204)
    at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at 
org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
    at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233)
    at 
StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
    at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:197)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:272)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:274)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:269)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:269)
    at 
org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:279)
    at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:219)
    at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:676)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1180)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    The other JobSubmitted event is hung as follows:
    "pool-8-thread-31" prio=10 tid=0x00007f78a8287800 nid=0x8cc in 
Object.wait() [0x00007f7585654000]
    java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:503)
    at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
    locked <0x00000007a59453d8> (a org.apache.spark.scheduler.JobWaiter)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:451)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1048)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1066)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1087)
    at shark.execution.FileSinkOperator.execute(FileSinkOperator.scala:165)
    at shark.execution.SparkTask.execute(SparkTask.scala:99)
    at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:144)
    at 
org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:57)
    at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1362)
    at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1146)
    at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952)
    at shark.SharkServerHandler.execute(SharkServer.scala:284)
    at shark.GatedSharkServerHandler.execute(SharkServer.scala:240)
    at 
org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:644)
    at 
org.apache.hadoop.hive.service.ThriftHive$Processor$execute.getResult(ThriftHive.java:628)
    at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
    at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
    at 
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
    I think DAGScheduler could use one thread to handleJobSubmitted.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/YanTangZhai/spark SPARK-2647

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1548.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1548
    
----
commit f3b5566ef0f04d5f2b42f7fc1dc41d3fcc69af8b
Author: YanTangZhai <[email protected]>
Date:   2014-07-23T14:01:58Z

    Update DAGScheduler.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to