Re: Spark driver hangs on start of job

2015-07-02 Thread Richard Marscher
Ah I see, glad that simple patch works for your problem. That seems to be a
different underlying problem than we have been experiencing. In our case,
the executors are failing properly, its just that none of the new ones will
ever escape experiencing the same exact issue. So we start a death spiral
of thousands of failed executors, all of which can't connect with the
driver. Meanwhile, the driver just sits there in the "zombie" state doing
nothing while it waits for executors to respond. In that light, my solution
is geared towards solving things on the driver-side gracefully.


On Thu, Jul 2, 2015 at 4:37 AM, Sjoerd Mulder 
wrote:

> Hi Richard,
>
> I have actually applied the following fix to our 1.4.0 version and this
> seem to resolve the zombies :)
>
> https://github.com/apache/spark/pull/7077/files
>
> Sjoerd
>
> 2015-06-26 20:08 GMT+02:00 Richard Marscher :
>
>> Hi,
>>
>> we are on 1.3.1 right now so in case there are differences in the Spark
>> files I'll walk through the logic of what we did and post a couple gists at
>> the end. We haven't committed to forking Spark for our own deployments yet,
>> so right now we shadow some Spark classes in our application code with our
>> versions of the classes. Keep in mind I am not a Spark committer so the
>> following is a best effort basis that is working for us. But it may be that
>> someone more knowledgable about the Spark codebase might see a pitfall to
>> my solution or a better solution.
>>
>> --
>>
>> First, we'll start with the root issue in TaskSchedulerImpl. You will
>> find the code that prints the "Initial job has not accepted any resources"
>> warning inside the "submitTasks" function. Spark creates a separate thread
>> that checks some conditions every "STARVATION_TIMEOUT" milliseconds until
>> the submitted task set has been launched. It only posts the warn logging
>> here and does nothing. I will come back to this part of the code in a
>> moment.
>>
>> The code that determines when the "hasLaunchedTask" flag gets set (and
>> thus closes out the starvation thread and the task set is being worked on
>> by the cluster) is within the "resourceOffers" function. The various Spark
>> Scheduler Backend classes will periodically call this function in
>> TaskSchedulerImpl until cluster resources have been assigned to the task
>> set.
>>
>> To start signaling the zombied scenario, I created a new flag: "@volatile
>> private var hasZombied = false". In our experience we always get the
>> resources in resourceOffer before the starvation thread runs, otherwise we
>> have always hit the zombie scenario if resources weren't allocated yet. So
>> I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask =
>> true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) {
>> dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in
>> a moment.
>>
>> The last detail here is to add code inside the starvation thread block
>> after it posts the warning log. Set "hasZombied" to true and then call
>> "this.cancel()" to stop the starvation thread from continuing to run. With
>> this we now have all the steps needed inside TaskSchedulerImpl to start
>> signaling out the zombied condition.
>>
>> Back to the custom function. DAGScheduler has reference to the
>> appropriate Spark listeners that can propagate errors to the task set and
>> more importantly back to your application code. If you look at DAGScheduler
>> class, you will find a function called "cleanUpAfterSchedulerStop()". This
>> function does everything we want, except it is hard coded to a specific
>> exception "val error = new SparkException(...)". What I did was copy this
>> and made another function that returned a custom Exception I created that I
>> use to signal the zombie, something like
>> SparkTaskResourceAllocationZombieError. Now you call this function within
>> the conditional block in TaskSchedulerImpl.resourceOffers and you should
>> see your exception propagating out to your application code so you can take
>> appropriate actions.
>>
>> In our case, we are submitting Spark applications programmatically from a
>> Scala application service on an EC2 instance to a Spark Standalone cluster
>> in EC2. Whenever we see this error, the application service EC2 instance is
>> unable to get resources from the cluster even when attempting subsequent
>> Spark applications for a long period of time (it eventually recovers hours
>> or days later but that is not useful for us). So in our case we need to
>> reschedule the failed Spark application on another EC2 application instance
>> and shut down this current EC2 instance because it can no longer get
>> cluster resources. Your use case may be different, but at least action can
>> be taken at an application level.
>>
>> Here is some source code, you should be able to locate most of my
>> additions to the code by searching for comments starting with "//
>> Localytics Code"
>> TaskSchedulerImpl gist:
>> https://gist

Re: Spark driver hangs on start of job

2015-07-02 Thread Sjoerd Mulder
Hi Richard,

I have actually applied the following fix to our 1.4.0 version and this
seem to resolve the zombies :)

https://github.com/apache/spark/pull/7077/files

Sjoerd

2015-06-26 20:08 GMT+02:00 Richard Marscher :

> Hi,
>
> we are on 1.3.1 right now so in case there are differences in the Spark
> files I'll walk through the logic of what we did and post a couple gists at
> the end. We haven't committed to forking Spark for our own deployments yet,
> so right now we shadow some Spark classes in our application code with our
> versions of the classes. Keep in mind I am not a Spark committer so the
> following is a best effort basis that is working for us. But it may be that
> someone more knowledgable about the Spark codebase might see a pitfall to
> my solution or a better solution.
>
> --
>
> First, we'll start with the root issue in TaskSchedulerImpl. You will find
> the code that prints the "Initial job has not accepted any resources"
> warning inside the "submitTasks" function. Spark creates a separate thread
> that checks some conditions every "STARVATION_TIMEOUT" milliseconds until
> the submitted task set has been launched. It only posts the warn logging
> here and does nothing. I will come back to this part of the code in a
> moment.
>
> The code that determines when the "hasLaunchedTask" flag gets set (and
> thus closes out the starvation thread and the task set is being worked on
> by the cluster) is within the "resourceOffers" function. The various Spark
> Scheduler Backend classes will periodically call this function in
> TaskSchedulerImpl until cluster resources have been assigned to the task
> set.
>
> To start signaling the zombied scenario, I created a new flag: "@volatile
> private var hasZombied = false". In our experience we always get the
> resources in resourceOffer before the starvation thread runs, otherwise we
> have always hit the zombie scenario if resources weren't allocated yet. So
> I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask =
> true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) {
> dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in
> a moment.
>
> The last detail here is to add code inside the starvation thread block
> after it posts the warning log. Set "hasZombied" to true and then call
> "this.cancel()" to stop the starvation thread from continuing to run. With
> this we now have all the steps needed inside TaskSchedulerImpl to start
> signaling out the zombied condition.
>
> Back to the custom function. DAGScheduler has reference to the appropriate
> Spark listeners that can propagate errors to the task set and more
> importantly back to your application code. If you look at DAGScheduler
> class, you will find a function called "cleanUpAfterSchedulerStop()". This
> function does everything we want, except it is hard coded to a specific
> exception "val error = new SparkException(...)". What I did was copy this
> and made another function that returned a custom Exception I created that I
> use to signal the zombie, something like
> SparkTaskResourceAllocationZombieError. Now you call this function within
> the conditional block in TaskSchedulerImpl.resourceOffers and you should
> see your exception propagating out to your application code so you can take
> appropriate actions.
>
> In our case, we are submitting Spark applications programmatically from a
> Scala application service on an EC2 instance to a Spark Standalone cluster
> in EC2. Whenever we see this error, the application service EC2 instance is
> unable to get resources from the cluster even when attempting subsequent
> Spark applications for a long period of time (it eventually recovers hours
> or days later but that is not useful for us). So in our case we need to
> reschedule the failed Spark application on another EC2 application instance
> and shut down this current EC2 instance because it can no longer get
> cluster resources. Your use case may be different, but at least action can
> be taken at an application level.
>
> Here is some source code, you should be able to locate most of my
> additions to the code by searching for comments starting with "//
> Localytics Code"
> TaskSchedulerImpl gist:
> https://gist.github.com/rmarsch/e5d298e582ab75957957
> DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7
>
> Regards,
> Richard
>
> On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder 
> wrote:
>
>> Hi Richard,
>>
>> I would  like to see how we can get a workaround to get out of the Zombie
>> situation since were planning for production :)
>>
>> If you could share the workaround or point directions that would be great!
>>
>> Sjoerd
>>
>> 2015-06-26 16:53 GMT+02:00 Richard Marscher :
>>
>>> We've seen this issue as well in production. We also aren't sure what
>>> causes it, but have just recently shaded some of the Spark code in
>>> TaskSchedulerImpl that we use to effectively bubble up an exception from
>>> Spark i

Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
Hi,

we are on 1.3.1 right now so in case there are differences in the Spark
files I'll walk through the logic of what we did and post a couple gists at
the end. We haven't committed to forking Spark for our own deployments yet,
so right now we shadow some Spark classes in our application code with our
versions of the classes. Keep in mind I am not a Spark committer so the
following is a best effort basis that is working for us. But it may be that
someone more knowledgable about the Spark codebase might see a pitfall to
my solution or a better solution.

--

First, we'll start with the root issue in TaskSchedulerImpl. You will find
the code that prints the "Initial job has not accepted any resources"
warning inside the "submitTasks" function. Spark creates a separate thread
that checks some conditions every "STARVATION_TIMEOUT" milliseconds until
the submitted task set has been launched. It only posts the warn logging
here and does nothing. I will come back to this part of the code in a
moment.

The code that determines when the "hasLaunchedTask" flag gets set (and thus
closes out the starvation thread and the task set is being worked on by the
cluster) is within the "resourceOffers" function. The various Spark
Scheduler Backend classes will periodically call this function in
TaskSchedulerImpl until cluster resources have been assigned to the task
set.

To start signaling the zombied scenario, I created a new flag: "@volatile
private var hasZombied = false". In our experience we always get the
resources in resourceOffer before the starvation thread runs, otherwise we
have always hit the zombie scenario if resources weren't allocated yet. So
I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask =
true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) {
dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in
a moment.

The last detail here is to add code inside the starvation thread block
after it posts the warning log. Set "hasZombied" to true and then call
"this.cancel()" to stop the starvation thread from continuing to run. With
this we now have all the steps needed inside TaskSchedulerImpl to start
signaling out the zombied condition.

Back to the custom function. DAGScheduler has reference to the appropriate
Spark listeners that can propagate errors to the task set and more
importantly back to your application code. If you look at DAGScheduler
class, you will find a function called "cleanUpAfterSchedulerStop()". This
function does everything we want, except it is hard coded to a specific
exception "val error = new SparkException(...)". What I did was copy this
and made another function that returned a custom Exception I created that I
use to signal the zombie, something like
SparkTaskResourceAllocationZombieError. Now you call this function within
the conditional block in TaskSchedulerImpl.resourceOffers and you should
see your exception propagating out to your application code so you can take
appropriate actions.

In our case, we are submitting Spark applications programmatically from a
Scala application service on an EC2 instance to a Spark Standalone cluster
in EC2. Whenever we see this error, the application service EC2 instance is
unable to get resources from the cluster even when attempting subsequent
Spark applications for a long period of time (it eventually recovers hours
or days later but that is not useful for us). So in our case we need to
reschedule the failed Spark application on another EC2 application instance
and shut down this current EC2 instance because it can no longer get
cluster resources. Your use case may be different, but at least action can
be taken at an application level.

Here is some source code, you should be able to locate most of my additions
to the code by searching for comments starting with "// Localytics Code"
TaskSchedulerImpl gist: https://gist.github.com/rmarsch/e5d298e582ab75957957
DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7

Regards,
Richard

On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder 
wrote:

> Hi Richard,
>
> I would  like to see how we can get a workaround to get out of the Zombie
> situation since were planning for production :)
>
> If you could share the workaround or point directions that would be great!
>
> Sjoerd
>
> 2015-06-26 16:53 GMT+02:00 Richard Marscher :
>
>> We've seen this issue as well in production. We also aren't sure what
>> causes it, but have just recently shaded some of the Spark code in
>> TaskSchedulerImpl that we use to effectively bubble up an exception from
>> Spark instead of zombie in this situation. If you are interested I can go
>> into more detail about that. Otherwise I'm also keen to find out more on
>> how this might be happening.
>>
>> On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a really annoying issue that i cannot replicate consistently,
>>> still it happens every +- 100 submissions. (it's a jo

Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
We've seen this issue as well in production. We also aren't sure what
causes it, but have just recently shaded some of the Spark code in
TaskSchedulerImpl that we use to effectively bubble up an exception from
Spark instead of zombie in this situation. If you are interested I can go
into more detail about that. Otherwise I'm also keen to find out more on
how this might be happening.

On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder 
wrote:

> Hi,
>
> I have a really annoying issue that i cannot replicate consistently, still
> it happens every +- 100 submissions. (it's a job that's running every 3
> minutes).
> Already reported an issue for this:
> https://issues.apache.org/jira/browse/SPARK-8592
>
> Here are the Thread dump of the Driver and the Executor:
> https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ
>
> Any direction is should look into?
>
> Spark 1.4.0
> Java 1.8.0_45 (Oracle Corporation)
> Scala 2.11.6
>
> I already tried to resolve the NPE by not logging the ActorRef. This makes
> the NPE go away :)
>
> But  the root cause lies deeper I expect, since then the driver then still
> hangs with the "*WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient resources*" messages. But there are enough resources
> available in the cluster, it has plenty of CPU and Memory left.
>
> Logs from Driver:
>
> 15/06/26 11:58:19 INFO Remoting: Starting remoting
> 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@172.17.0.123:51415]
> 15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver'
> on port 51415.
> 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker
> 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster
> 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at
> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630
> 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity
> 265.1 MB
> 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8
> 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server
> 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file
> server' on port 33176.
> 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator
> 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at
> http://172.17.0.123:4040
> 15/06/26 11:58:20 INFO SparkContext: Added JAR
> file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at
> http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp
> 1435319900257
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkMaster@172.17.42.1:7077/user/Master...
> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark
> cluster with app ID app-20150626115820-0917
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added:
> app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 (
> 10.0.7.171:47050) with 1 cores
> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID
> app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0
> GB RAM
> 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution
> thread
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
> app-20150626115820-0917/0 is now LOADING
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
> app-20150626115820-0917/0 is now RUNNING
> 15/06/26 11:58:20 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000.
> 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000
> 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager
> 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block
> manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver,
> 172.17.0.123, 52000)
> 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager
> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is
> ready for scheduling beginning after reached minRegisteredResourcesRatio:
> 0.0
> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
> 15/06/26 11:58:24 INFO SparkContext: Starting job: map at
> SparkProductEventAggregator.scala:144
> 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1
> [5cc3f53084]
> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from
> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping
> {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for
> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
> 15/06/26 11:58:24 INFO DAGSchedul

Spark driver hangs on start of job

2015-06-26 Thread Sjoerd Mulder
Hi,

I have a really annoying issue that i cannot replicate consistently, still
it happens every +- 100 submissions. (it's a job that's running every 3
minutes).
Already reported an issue for this:
https://issues.apache.org/jira/browse/SPARK-8592

Here are the Thread dump of the Driver and the Executor:
https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ

Any direction is should look into?

Spark 1.4.0
Java 1.8.0_45 (Oracle Corporation)
Scala 2.11.6

I already tried to resolve the NPE by not logging the ActorRef. This makes
the NPE go away :)

But  the root cause lies deeper I expect, since then the driver then still
hangs with the "*WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient resources*" messages. But there are enough resources
available in the cluster, it has plenty of CPU and Memory left.

Logs from Driver:

15/06/26 11:58:19 INFO Remoting: Starting remoting
15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@172.17.0.123:51415]
15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver' on
port 51415.
15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker
15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster
15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at
/tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630
15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity 265.1
MB
15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8
15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server
15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file
server' on port 33176.
15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator
15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
15/06/26 11:58:20 INFO SparkUI: Started SparkUI at http://172.17.0.123:4040
15/06/26 11:58:20 INFO SparkContext: Added JAR
file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at
http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp
1435319900257
15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master
akka.tcp://sparkMaster@172.17.42.1:7077/user/Master...
15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark
cluster with app ID app-20150626115820-0917
15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added:
app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 (
10.0.7.171:47050) with 1 cores
15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0 GB
RAM
15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution
thread
15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
app-20150626115820-0917/0 is now LOADING
15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
app-20150626115820-0917/0 is now RUNNING
15/06/26 11:58:20 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000.
15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000
15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager
15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block
manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver,
172.17.0.123, 52000)
15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager
15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is
ready for scheduling beginning after reached minRegisteredResourcesRatio:
0.0
15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
15/06/26 11:58:24 INFO SparkContext: Starting job: map at
SparkProductEventAggregator.scala:144
15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1 [5cc3f53084]
15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from
[675d42c8-9823-4d3c-8e86-5aa611d38770/events]
15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping
{675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for
[675d42c8-9823-4d3c-8e86-5aa611d38770/events]
15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at
SparkProductEventAggregator.scala:144)
15/06/26 11:58:24 INFO DAGScheduler: Got job 0 (map at
SparkProductEventAggregator.scala:144) with 200 output partitions
(allowLocal=false)
15/06/26 11:58:24 INFO DAGScheduler: Final stage: ResultStage 1(map at
SparkProductEventAggregator.scala:144)
15/06/26 11:58:24 INFO DAGScheduler: Parents of final stage:
List(ShuffleMapStage 0)
15/06/26 11:58:24 INFO DAGScheduler: Missing parents: List(ShuffleMapStage
0)
15/06/26 11:58:24 INFO DAGScheduler: Submitting ShuffleMapStage 0
(MapPartitionsRDD[5] at map at SparkProductEventAggregator.scala:144),