Re: [GraphX]: Prevent recomputation of DAG

2024-03-18 Thread Mich Talebzadeh
Hi,

I must admit I don't know much about this Fruchterman-Reingold (call
it FR) visualization using GraphX and Kubernetes..But you are
suggesting this slowdown issue starts after the second iteration, and
caching/persisting the graph after each iteration does not help. FR
involves many computations between vertex pairs. In MapReduce (or
shuffle) steps, Data might be shuffled across the network, impacting
performance for large graphs. The usual steps to verify this is
through Spark UI in Stages, SQL and execute tabbs, You will see the
time taken for each step and the amount of read/write  etc. Also
repeatedly creating and destroying GraphX graphs in each iteration may
lead to garbage collection (GC) overhead.So you should consider r
profiling your application to identify bottlenecks and pinpoint which
part of the code is causing the slowdown.  As I mentioned Spark offers
profiling tools like Spark UI or third-party libraries.for this
purpose.

HTH


Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".



On Sun, 17 Mar 2024 at 18:45, Marek Berith  wrote:
>
> Dear community,
> for my diploma thesis, we are implementing a distributed version of
> Fruchterman-Reingold visualization algorithm, using GraphX and Kubernetes. Our
> solution is a backend that continously computes new positions of vertices in a
> graph and sends them via RabbitMQ to a consumer. Fruchterman-Reingold is an
> iterative algorithm, meaning that in each iteration repulsive and attractive
> forces between vertices are computed and then new positions of vertices based
> on those forces are computed. Graph vertices and edges are stored in a GraphX
> graph structure. Forces between vertices are computed using MapReduce(between
> each pair of vertices) and aggregateMessages(for vertices connected via
> edges). After an iteration of the algorithm, the recomputed positions from the
> RDD are serialized using collect and sent to the RabbitMQ queue.
>
> Here comes the issue. The first two iterations of the algorithm seem to be
> quick, but at the third iteration, the algorithm is very slow until it reaches
> a point at which it cannot finish an iteration in real time. It seems like
> caching of the graph may be an issue, because if we serialize the graph after
> each iteration in an array and create new graph from the array in the new
> iteration, we get a constant usage of memory and each iteration takes the same
> amount of time. We had already tried to cache/persist/checkpoint the graph
> after each iteration but it didn't help, so maybe we are doing something
> wrong. We do not think that serializing the graph into an array should be the
> solution for such a complex library like Apache Spark. I'm also not very
> confident how this fix will affect performance for large graphs or in parallel
> environment. We are attaching a short example of code that shows doing an
> iteration of the algorithm, input and output example.
>
> We would appreciate if you could help us fix this issue or give us any
> meaningful ideas, as we had tried everything that came to mind.
>
> We look forward to your reply.
> Thank you, Marek Berith
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[GraphX]: Prevent recomputation of DAG

2024-03-17 Thread Marek Berith

Dear community,
for my diploma thesis, we are implementing a distributed version of 
Fruchterman-Reingold visualization algorithm, using GraphX and Kubernetes. Our 
solution is a backend that continously computes new positions of vertices in a 
graph and sends them via RabbitMQ to a consumer. Fruchterman-Reingold is an 
iterative algorithm, meaning that in each iteration repulsive and attractive 
forces between vertices are computed and then new positions of vertices based 
on those forces are computed. Graph vertices and edges are stored in a GraphX 
graph structure. Forces between vertices are computed using MapReduce(between 
each pair of vertices) and aggregateMessages(for vertices connected via 
edges). After an iteration of the algorithm, the recomputed positions from the 
RDD are serialized using collect and sent to the RabbitMQ queue.


Here comes the issue. The first two iterations of the algorithm seem to be 
quick, but at the third iteration, the algorithm is very slow until it reaches 
a point at which it cannot finish an iteration in real time. It seems like 
caching of the graph may be an issue, because if we serialize the graph after 
each iteration in an array and create new graph from the array in the new 
iteration, we get a constant usage of memory and each iteration takes the same 
amount of time. We had already tried to cache/persist/checkpoint the graph 
after each iteration but it didn't help, so maybe we are doing something 
wrong. We do not think that serializing the graph into an array should be the 
solution for such a complex library like Apache Spark. I'm also not very 
confident how this fix will affect performance for large graphs or in parallel 
environment. We are attaching a short example of code that shows doing an 
iteration of the algorithm, input and output example.


We would appreciate if you could help us fix this issue or give us any 
meaningful ideas, as we had tried everything that came to mind.


We look forward to your reply.
Thank you, Marek Berith
 def iterate(
  sc: SparkContext,
  graph: graphx.Graph[GeneralVertex, EdgeProperty],
  metaGraph: graphx.Graph[GeneralVertex, EdgeProperty])
  : (graphx.Graph[GeneralVertex, EdgeProperty], graphx.Graph[GeneralVertex, 
EdgeProperty]) = {
val attractiveDisplacement: VertexRDD[(VertexId, Vector)] =
  this.calculateAttractiveForces(graph)
val repulsiveDisplacement: RDD[(VertexId, Vector)] = 
this.calculateRepulsiveForces(graph)
val metaVertexDisplacement: RDD[(VertexId, Vector)] =
  this.calculateMetaVertexForces(graph, metaGraph.vertices)
val metaEdgeDisplacement: RDD[(VertexId, Vector)] =
  this.calculateMetaEdgeForces(metaGraph)
val displacements: RDD[(VertexId, Vector)] = this.combineDisplacements(
  attractiveDisplacement,
  repulsiveDisplacement,
  metaVertexDisplacement,
  metaEdgeDisplacement)
val newVertices: RDD[(VertexId, GeneralVertex)] = 
this.displaceVertices(graph, displacements)
val newGraph = graphx.Graph(newVertices, graph.edges)
// persist or checkpoint or cache? or something else?
newGraph.persist()
metaGraph.persist()
(newGraph, metaGraph)
  }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-04-01 Thread Mich Talebzadeh
Good stuff Khalid.

I have created a section in Apache Spark Community Stack called spark
foundation.  spark-foundation - Apache Spark Community - Slack
<https://app.slack.com/client/T04URTRBZ1R/C051CL5T1KL/thread/C0501NBTNQG-1680132989.091199>

I invite you to add your weblink to that section.

HTH
Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 1 Apr 2023 at 13:12, Khalid Mammadov 
wrote:

> Hey AN-TRUONG
>
> I have got some articles about this subject that should help.
> E.g.
> https://khalidmammadov.github.io/spark/spark_internals_rdd.html
>
> Also check other Spark Internals on web.
>
> Regards
> Khalid
>
> On Fri, 31 Mar 2023, 16:29 AN-TRUONG Tran Phan, 
> wrote:
>
>> Thank you for your information,
>>
>> I have tracked the spark history server on port 18080 and the spark UI on
>> port 4040. I see the result of these two tools as similar right?
>>
>> I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
>> the images does, is it possible?
>> https://i.stack.imgur.com/Azva4.png
>>
>> Best regards,
>>
>> An - Truong
>>
>>
>> On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Are you familiar with spark GUI default on port 4040?
>>>
>>> have a look.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
>>> tr.phan.tru...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am learning about Apache Spark and want to know the meaning of each
>>>> Task created on the Jobs recorded on Spark history.
>>>>
>>>> For example, the application I write creates 17 jobs, in which job 0
>>>> runs for 10 minutes, there are 2384 small tasks and I want to learn about
>>>> the meaning of these 2384, is it possible?
>>>>
>>>> I found a picture of DAG in the Jobs and want to know the relationship
>>>> between DAG and Task, is it possible (Specifically from the attached file
>>>> DAG and 2384 tasks below)?
>>>>
>>>> Thank you very much, have a nice day everyone.
>>>>
>>>> Best regards,
>>>>
>>>> An-Trường.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> Trân Trọng,
>>
>> An Trường.
>>
>


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-04-01 Thread Khalid Mammadov
Hey AN-TRUONG

I have got some articles about this subject that should help.
E.g.
https://khalidmammadov.github.io/spark/spark_internals_rdd.html

Also check other Spark Internals on web.

Regards
Khalid

On Fri, 31 Mar 2023, 16:29 AN-TRUONG Tran Phan, 
wrote:

> Thank you for your information,
>
> I have tracked the spark history server on port 18080 and the spark UI on
> port 4040. I see the result of these two tools as similar right?
>
> I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
> the images does, is it possible?
> https://i.stack.imgur.com/Azva4.png
>
> Best regards,
>
> An - Truong
>
>
> On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh 
> wrote:
>
>> Are you familiar with spark GUI default on port 4040?
>>
>> have a look.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
>> tr.phan.tru...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am learning about Apache Spark and want to know the meaning of each
>>> Task created on the Jobs recorded on Spark history.
>>>
>>> For example, the application I write creates 17 jobs, in which job 0
>>> runs for 10 minutes, there are 2384 small tasks and I want to learn about
>>> the meaning of these 2384, is it possible?
>>>
>>> I found a picture of DAG in the Jobs and want to know the relationship
>>> between DAG and Task, is it possible (Specifically from the attached file
>>> DAG and 2384 tasks below)?
>>>
>>> Thank you very much, have a nice day everyone.
>>>
>>> Best regards,
>>>
>>> An-Trường.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Trân Trọng,
>
> An Trường.
>


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-03-31 Thread Mich Talebzadeh
yes history refers to completed jobs. 4040 is the running jobs

you should have screen shots for executors and stages as well.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 31 Mar 2023 at 16:17, AN-TRUONG Tran Phan 
wrote:

> Thank you for your information,
>
> I have tracked the spark history server on port 18080 and the spark UI on
> port 4040. I see the result of these two tools as similar right?
>
> I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
> the images does, is it possible?
> https://i.stack.imgur.com/Azva4.png
>
> Best regards,
>
> An - Truong
>
>
> On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh 
> wrote:
>
>> Are you familiar with spark GUI default on port 4040?
>>
>> have a look.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
>> tr.phan.tru...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am learning about Apache Spark and want to know the meaning of each
>>> Task created on the Jobs recorded on Spark history.
>>>
>>> For example, the application I write creates 17 jobs, in which job 0
>>> runs for 10 minutes, there are 2384 small tasks and I want to learn about
>>> the meaning of these 2384, is it possible?
>>>
>>> I found a picture of DAG in the Jobs and want to know the relationship
>>> between DAG and Task, is it possible (Specifically from the attached file
>>> DAG and 2384 tasks below)?
>>>
>>> Thank you very much, have a nice day everyone.
>>>
>>> Best regards,
>>>
>>> An-Trường.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Trân Trọng,
>
> An Trường.
>


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-03-31 Thread AN-TRUONG Tran Phan
Thank you for your information,

I have tracked the spark history server on port 18080 and the spark UI on
port 4040. I see the result of these two tools as similar right?

I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
the images does, is it possible?
https://i.stack.imgur.com/Azva4.png

Best regards,

An - Truong


On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh 
wrote:

> Are you familiar with spark GUI default on port 4040?
>
> have a look.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
> tr.phan.tru...@gmail.com> wrote:
>
>> Hi,
>>
>> I am learning about Apache Spark and want to know the meaning of each
>> Task created on the Jobs recorded on Spark history.
>>
>> For example, the application I write creates 17 jobs, in which job 0 runs
>> for 10 minutes, there are 2384 small tasks and I want to learn about the
>> meaning of these 2384, is it possible?
>>
>> I found a picture of DAG in the Jobs and want to know the relationship
>> between DAG and Task, is it possible (Specifically from the attached file
>> DAG and 2384 tasks below)?
>>
>> Thank you very much, have a nice day everyone.
>>
>> Best regards,
>>
>> An-Trường.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Trân Trọng,

An Trường.


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-03-31 Thread Mich Talebzadeh
Are you familiar with spark GUI default on port 4040?

have a look.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan 
wrote:

> Hi,
>
> I am learning about Apache Spark and want to know the meaning of each Task
> created on the Jobs recorded on Spark history.
>
> For example, the application I write creates 17 jobs, in which job 0 runs
> for 10 minutes, there are 2384 small tasks and I want to learn about the
> meaning of these 2384, is it possible?
>
> I found a picture of DAG in the Jobs and want to know the relationship
> between DAG and Task, is it possible (Specifically from the attached file
> DAG and 2384 tasks below)?
>
> Thank you very much, have a nice day everyone.
>
> Best regards,
>
> An-Trường.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Mapping stages in DAG to line of code in pyspark

2021-04-18 Thread Dhruv Kumar
Hi

I am using PySpark for writing Spark queries. My research project requires me 
to accurately measure latency for each and every operator/stage in the query. I 
can make some guesses but unable to exactly map the stages (shown in the DAG on 
Spark UI) to the exact line in my PySpark code.

Can some one help? I can share some examples if required.

Thanks
Dhruv 

--
Dhruv Kumar
PhD Candidate
Computer Science and Engineering
University of Minnesota
www.dhruvkumar.me <http://dhruvkumar.me/>





Re: Where is the DAG stored before catalyst gets it?

2018-10-06 Thread Jacek Laskowski
Hi Jean Georges,

> I am assuming it is still in the master and when catalyst is finished it
sends the tasks to the workers.

Sorry to be that direct, but the sentence does not make much sense to me.
Again, very sorry for saying it in the very first sentence. Since I know
Jean Georges I allowed myself for more openness.

In other words, "the master" part seems to suggest that you use Spark
Standalone cluster. Correct? Other cluster use different naming for the
master/manager node.

"when catalyst is finished" that one is really tough to understand. You
mean once all the optimizations are applied and the query is ready for
execution? The final output of the "query execution pipeline" is to
generate a RDD with the right code for execution. At this phase, the query
is more an RDD than a Dataset.

"it sends the tasks to the workers." since we're talking about an RDD, this
abstraction is planned as a set of tasks (one per partition of the RDD).
And yes, the tasks are sent out over the wire to executors. It's been like
this from Spark 1.0 (and even earlier).

Hope I helped a bit.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski


On Fri, Oct 5, 2018 at 12:36 AM Jean Georges Perrin  wrote:

> Hi,
>
> I am assuming it is still in the master and when catalyst is finished it
> sends the tasks to the workers.
>
> Correct?
>
> tia
>
> jg
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Where is the DAG stored before catalyst gets it?

2018-10-04 Thread Jean Georges Perrin
Hi, 

I am assuming it is still in the master and when catalyst is finished it sends 
the tasks to the workers.

Correct?

tia

jg
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark ML DAG Pipelines

2017-09-07 Thread Srikanth Sampath
Hi Spark Experts,

Can someone point me to some examples for non-linear (DAG) ML pipelines.
That would be of great help.
Thanks much in advance
-Srikanth


Re: [Spark Streaming] DAG Output Processing mechanism

2017-05-29 Thread Nipun Arora
Sending out the message again.. Hopefully someone cal clarify :)

I would like some clarification on the execution model for spark streaming. 

Broadly, I am trying to understand if output operations in a DAG are only
processed after all intermediate operations are finished for all parts of
the DAG. 

Let me give an example: 

I have a dstream -A , I do map operations on this dstream and create two
different dstreams -B and C such that 

A ---> B -> (some operations) ---> kafka output 1   
 \> C---> ( some operations) --> kafka output 2 

I want to understand will kafka output 1 and kafka output 2 wait for all
operations to finish on B and C before sending an output, or will they
simply send an output as soon as the ops in B and C are done. 

What kind of synchronization guarantees are there?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-DAG-Output-Processing-mechanism-tp28713p28715.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming] DAG Output Processing mechanism

2017-05-28 Thread Nipun Arora
Apogies - Resending as the previous mail went with some unnecessary copy
paste.

I would like some clarification on the execution model for spark streaming.

Broadly, I am trying to understand if output operations in a DAG are only
processed after all intermediate operations are finished for all parts of
the DAG.

Let me give an example:

I have a dstream -A , I do map operations on this dstream and create two
different dstreams -B and C such that

A ---> B -> (some operations) ---> kafka output 1
 \> C---> ( some operations) --> kafka output 2

I want to understand will kafka output 1 and kafka output 2 wait for all
operations to finish on B and C before sending an output, or will they
simply send an output as soon as the ops in B and C are done.

What kind of synchronization guarantees are there?

On Sun, May 28, 2017 at 9:59 AM, Nipun Arora 
wrote:

> up vote
> 0
> down vote
> favorite
> I would like some clarification on the execution model for spark streaming.
>
> Broadly, I am trying to understand if output operations in a DAG are only
> processed after all intermediate operations are finished for all parts of
> the DAG.
>
> Let me give an example:
>
> I have a dstream -A , I do map operations on this dstream and create two
> different dstreams -B and C such that
>
> A ---> B -> (some operations) ---> kafka output 1
>  \> C---> ( some operations) --> kafka output 2
>
> I want to understand will kafka output 1 and kafka output 2 wait for all
> operations to finish on B and C before sending an output, or will they
> simply send an output as soon as the ops in B and C are done.
>
> What kind of synchronization guarantees are there?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Streaming-DAG-Output-
> Processing-mechanism-tp28713.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark Streaming] DAG Output Processing mechanism

2017-05-28 Thread Nipun Arora
up vote
0
down vote
favorite
I would like some clarification on the execution model for spark streaming.

Broadly, I am trying to understand if output operations in a DAG are only
processed after all intermediate operations are finished for all parts of
the DAG.

Let me give an example:

I have a dstream -A , I do map operations on this dstream and create two
different dstreams -B and C such that

A ---> B -> (some operations) ---> kafka output 1   
 \> C---> ( some operations) --> kafka output 2

I want to understand will kafka output 1 and kafka output 2 wait for all
operations to finish on B and C before sending an output, or will they
simply send an output as soon as the ops in B and C are done.

What kind of synchronization guarantees are there?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-DAG-Output-Processing-mechanism-tp28713.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Streaming] DAG Execution Model Clarification

2017-05-26 Thread Nipun Arora
Hi,

I would like some clarification on the execution model for spark streaming.

Broadly, I am trying to understand if output operations in a DAG are only
processed after all intermediate operations are finished for all parts of
the DAG.

Let me give an example:

I have a dstream -A , I do map operations on this dstream and create two
different dstreams -B and C such that

A ---> B -> (some operations) ---> kafka output 1
  \> C---> ( some operations) --> kafka output 2

I want to understand will kafka output 1 and kafka output 2 wait for all
operations to finish on B and C before sending an output, or will they
simply send an output as soon as the ops in B and C are done.

What kind of synchronization guarantees are there?

Thanks
Nipun


How to generate stage for this RDD DAG please?

2017-05-23 Thread ??????????
Hi all,


I read some paper about the stage, l know the narrow dependency and shuffle 
dependency.


About the belowing RDD DAG,  how deos spark generate the stage DAG please?
And is  this RDD DAG  legal  please?<>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: DAG Visualization option is missing on Spark Web UI

2017-01-30 Thread Md. Rezaul Karim
Hi Mark,

That worked for me! Thanks a million.

Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>

On 29 January 2017 at 01:53, Mark Hamstra  wrote:

> Try selecting a particular Job instead of looking at the summary page for
> all Jobs.
>
> On Sat, Jan 28, 2017 at 4:25 PM, Md. Rezaul Karim <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi Jacek,
>>
>> I tried accessing Spark web UI on both Firefox and Google Chrome browsers
>> with ad blocker enabled. I do see other options like* User, Total
>> Uptime, Scheduling Mode, **Active Jobs, Completed Jobs and* Event
>> Timeline. However, I don't see an option for DAG visualization.
>>
>> Please note that I am experiencing the same issue with Spark 2.x (i.e.
>> 2.0.0, 2.0.1, 2.0.2 and 2.1.0). Refer the attached screenshot of the UI
>> that I am seeing on my machine:
>>
>> [image: Inline images 1]
>>
>>
>> Please suggest.
>>
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim*, BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> <http://139.59.184.114/index.html>
>>
>> On 28 January 2017 at 18:51, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> Wonder if you have any adblocker enabled in your browser? Is this the
>>> only version giving you this behavior? All Spark jobs have no
>>> visualization?
>>>
>>> Jacek
>>>
>>> On 28 Jan 2017 7:03 p.m., "Md. Rezaul Karim" <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
>>> Hi All,
>>>
>>> I am running a Spark job on my local machine written in Scala with Spark
>>> 2.1.0. However, I am not seeing any option of "*DAG Visualization*" at 
>>> http://localhost:4040/jobs/
>>>
>>>
>>> Suggestion, please.
>>>
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim*, BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> <http://139.59.184.114/index.html>
>>>
>>>
>>>
>>
>


Re: DAG Visualization option is missing on Spark Web UI

2017-01-28 Thread Mark Hamstra
Try selecting a particular Job instead of looking at the summary page for
all Jobs.

On Sat, Jan 28, 2017 at 4:25 PM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi Jacek,
>
> I tried accessing Spark web UI on both Firefox and Google Chrome browsers
> with ad blocker enabled. I do see other options like* User, Total Uptime,
> Scheduling Mode, **Active Jobs, Completed Jobs and* Event Timeline.
> However, I don't see an option for DAG visualization.
>
> Please note that I am experiencing the same issue with Spark 2.x (i.e.
> 2.0.0, 2.0.1, 2.0.2 and 2.1.0). Refer the attached screenshot of the UI
> that I am seeing on my machine:
>
> [image: Inline images 1]
>
>
> Please suggest.
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>
> On 28 January 2017 at 18:51, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> Wonder if you have any adblocker enabled in your browser? Is this the
>> only version giving you this behavior? All Spark jobs have no
>> visualization?
>>
>> Jacek
>>
>> On 28 Jan 2017 7:03 p.m., "Md. Rezaul Karim" <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>> Hi All,
>>
>> I am running a Spark job on my local machine written in Scala with Spark
>> 2.1.0. However, I am not seeing any option of "*DAG Visualization*" at 
>> http://localhost:4040/jobs/
>>
>>
>> Suggestion, please.
>>
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim*, BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> <http://139.59.184.114/index.html>
>>
>>
>>
>


Re: DAG Visualization option is missing on Spark Web UI

2017-01-28 Thread Md. Rezaul Karim
Hi Jacek,

I tried accessing Spark web UI on both Firefox and Google Chrome browsers
with ad blocker enabled. I do see other options like* User, Total Uptime,
Scheduling Mode, **Active Jobs, Completed Jobs and* Event Timeline.
However, I don't see an option for DAG visualization.

Please note that I am experiencing the same issue with Spark 2.x (i.e.
2.0.0, 2.0.1, 2.0.2 and 2.1.0). Refer the attached screenshot of the UI
that I am seeing on my machine:

[image: Inline images 1]


Please suggest.




Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>

On 28 January 2017 at 18:51, Jacek Laskowski  wrote:

> Hi,
>
> Wonder if you have any adblocker enabled in your browser? Is this the only
> version giving you this behavior? All Spark jobs have no visualization?
>
> Jacek
>
> On 28 Jan 2017 7:03 p.m., "Md. Rezaul Karim"  org> wrote:
>
> Hi All,
>
> I am running a Spark job on my local machine written in Scala with Spark
> 2.1.0. However, I am not seeing any option of "*DAG Visualization*" at 
> http://localhost:4040/jobs/
>
>
> Suggestion, please.
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>
>
>


Re: DAG Visualization option is missing on Spark Web UI

2017-01-28 Thread Jacek Laskowski
Hi,

Wonder if you have any adblocker enabled in your browser? Is this the only
version giving you this behavior? All Spark jobs have no visualization?

Jacek

On 28 Jan 2017 7:03 p.m., "Md. Rezaul Karim" <
rezaul.ka...@insight-centre.org> wrote:

Hi All,

I am running a Spark job on my local machine written in Scala with Spark
2.1.0. However, I am not seeing any option of "*DAG Visualization*" at
http://localhost:4040/jobs/


Suggestion, please.




Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>


DAG Visualization option is missing on Spark Web UI

2017-01-28 Thread Md. Rezaul Karim
Hi All,

I am running a Spark job on my local machine written in Scala with Spark
2.1.0. However, I am not seeing any option of "*DAG Visualization*" at
http://localhost:4040/jobs/


Suggestion, please.




Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>


Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Mich Talebzadeh
right let us simplify this.

can you run the whole thing *once* only and send dag execution output from
UI?

you can use snipping tool to take the image.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 09:59, Rabin Banerjee 
wrote:

> Hi ,
>
>
>1. You are doing some analytics I guess?  *YES*
>2. It is almost impossible to guess what is happening except that you
>are looping 50 times over the same set of sql?  *I am Not Looping any
>SQL, All SQLs are called exactly once , which requires output from prev
>SQL.*
>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>- n steps, *TRUE, But If I have N SQL and all i th SQL is
>dependent upon i-1 , how spark optimize the memory ? is it like for each i
>th sql it will start execution from STAGE 0 *
>4. you are not storing anything in  memory(no cache, no persist), so
>all memory is used for the execution , IF Spark is not storing anything in
>memory , then when it is executing *i th sql it will start execution
>from STAGE 0 i.e starting from file read *
>5. What happens when you run it only once? How much memory is used
>    (look at UI page, 4040 by default) , ? *I checked Spark UI DAG , so
>many file reads , Why ?*
>6.  What Spark mode is being used (Local, Standalone, Yarn) ? *Yarn*
>7. OOM could be anything depending on how much you are allocating to
>your driver memory in spark-submit ? *Driver and executor memory is
>set as 4gb , input data size is less than 1 GB, NO of executor is 5.*
>
> *I am still bit confused about spark's execution plan on multiple SQL with
> only one action .*
>
> *Is it executing each SQL separately and trying to store intermediate
> result in memory which is causing OOM/GC overhead ?*
> *And Still my questions are ...*
>
> *1. Will Spark optimize multiple SQL queries into one single plysical plan
> Which will at least will not execute same stage twice , read file once... ?*
> *2. In DAG I can see a lot of file read and lot of stages , Why ? I only
> called action once ? Why in multiple stage Spark is again starting from
> file reading ?*
> *3. Is every SQL will execute and its intermediate result will be stored
> in memory ?*
> *4. What is something that causing OOM and GC overhead here ?*
> *5. What is optimization that could be taken care of ? *
>
>
> On Sat, Sep 10, 2016 at 11:35 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi
>>
>>1. You are doing some analytics I guess?
>>2. It is almost impossible to guess what is happening except that you
>>are looping 50 times over the same set of sql?
>>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>>-n steps
>>4. you are not storing anything in  memory(no cache, no persist), so
>>all memory is used for the execution
>>5. What happens when you run it only once? How much memory is used
>>(look at UI page, 4040 by default)
>>6.  What Spark mode is being used (Local, Standalone, Yarn)
>>7. OOM could be anything depending on how much you are allocating to
>>your driver memory in spark-submit
>>
>> HTH
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 10 September 2016 at 06:21, Rabin Banerjee <
>> dev.rabin.baner...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>>  I am writing and executing a Spark Batch program whic

Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-10 Thread Rabin Banerjee
Hi ,


   1. You are doing some analytics I guess?  *YES*
   2. It is almost impossible to guess what is happening except that you
   are looping 50 times over the same set of sql?  *I am Not Looping any
   SQL, All SQLs are called exactly once , which requires output from prev
   SQL.*
   3. Your sql step n depends on step n-1. So spark cannot get rid of 1
   - n steps, *TRUE, But If I have N SQL and all i th SQL is dependent
   upon i-1 , how spark optimize the memory ? is it like for each i th sql it
   will start execution from STAGE 0 *
   4. you are not storing anything in  memory(no cache, no persist), so all
   memory is used for the execution , IF Spark is not storing anything in
   memory , then when it is executing *i th sql it will start execution
   from STAGE 0 i.e starting from file read *
   5. What happens when you run it only once? How much memory is used (look
   at UI page, 4040 by default) , ? *I checked Spark UI DAG , so many file
   reads , Why ?*
   6.  What Spark mode is being used (Local, Standalone, Yarn) ? *Yarn*
   7. OOM could be anything depending on how much you are allocating to
   your driver memory in spark-submit ? *Driver and executor memory is set
   as 4gb , input data size is less than 1 GB, NO of executor is 5.*

*I am still bit confused about spark's execution plan on multiple SQL with
only one action .*

*Is it executing each SQL separately and trying to store intermediate
result in memory which is causing OOM/GC overhead ?*
*And Still my questions are ...*

*1. Will Spark optimize multiple SQL queries into one single plysical plan
Which will at least will not execute same stage twice , read file once... ?*
*2. In DAG I can see a lot of file read and lot of stages , Why ? I only
called action once ? Why in multiple stage Spark is again starting from
file reading ?*
*3. Is every SQL will execute and its intermediate result will be stored in
memory ?*
*4. What is something that causing OOM and GC overhead here ?*
*5. What is optimization that could be taken care of ? *


On Sat, Sep 10, 2016 at 11:35 AM, Mich Talebzadeh  wrote:

> Hi
>
>1. You are doing some analytics I guess?
>2. It is almost impossible to guess what is happening except that you
>are looping 50 times over the same set of sql?
>3. Your sql step n depends on step n-1. So spark cannot get rid of 1
>-n steps
>4. you are not storing anything in  memory(no cache, no persist), so
>all memory is used for the execution
>5. What happens when you run it only once? How much memory is used
>(look at UI page, 4040 by default)
>6.  What Spark mode is being used (Local, Standalone, Yarn)
>7. OOM could be anything depending on how much you are allocating to
>your driver memory in spark-submit
>
> HTH
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 06:21, Rabin Banerjee <
> dev.rabin.baner...@gmail.com> wrote:
>
>> HI All,
>>
>>  I am writing and executing a Spark Batch program which only use
>> SPARK-SQL , But it is taking lot of time and finally giving GC overhead .
>>
>> Here is the program ,
>>
>> 1.Read 3 files ,one medium size and 2 small files, and register them as
>> DF.
>> 2.
>>  fire sql with complex aggregation and windowing .
>>  register result as DF.
>>
>> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>>
>> 4. All SQLs are sequential , i.e next step requires prev step result .
>>
>> 5. Finally save the final DF .(This is the only action called).
>>
>> Note ::
>>
>> 1. I haven't persists the intermediate DF , as I think Spark will
>> optimize multiple SQL into one physical execution plan .
>> 2. Executor memory and Driver memory is set as 4gb which is too high as
>> data size is in MB.
>>
>> Questions ::
>>
>> 1. Will Spark optimize multiple SQL queries into one single plysical plan
>> ?
>> 2. In DAG I can see a lot of file read and lot of stages , Why ? I only
>> called action once ?
>> 3. Is every SQL will execute and its intermediate result will be stored
>> in memory ?
>> 4. What is something that causing OOM and GC overhead here ?
>> 5. What is optimization that could be taken care of ?
>>
>> Spark Version 1.5.x
>>
>>
>> Thanks in advance .
>> Rabin
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-09 Thread Mich Talebzadeh
Hi

   1. You are doing some analytics I guess?
   2. It is almost impossible to guess what is happening except that you
   are looping 50 times over the same set of sql?
   3. Your sql step n depends on step n-1. So spark cannot get rid of 1 -n
   steps
   4. you are not storing anything in  memory(no cache, no persist), so all
   memory is used for the execution
   5. What happens when you run it only once? How much memory is used (look
   at UI page, 4040 by default)
   6.  What Spark mode is being used (Local, Standalone, Yarn)
   7. OOM could be anything depending on how much you are allocating to
   your driver memory in spark-submit

HTH





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 September 2016 at 06:21, Rabin Banerjee 
wrote:

> HI All,
>
>  I am writing and executing a Spark Batch program which only use SPARK-SQL
> , But it is taking lot of time and finally giving GC overhead .
>
> Here is the program ,
>
> 1.Read 3 files ,one medium size and 2 small files, and register them as DF.
> 2.
>  fire sql with complex aggregation and windowing .
>  register result as DF.
>
> 3.  .Repeat step 2 almost 50 times .so 50 sql .
>
> 4. All SQLs are sequential , i.e next step requires prev step result .
>
> 5. Finally save the final DF .(This is the only action called).
>
> Note ::
>
> 1. I haven't persists the intermediate DF , as I think Spark will optimize
> multiple SQL into one physical execution plan .
> 2. Executor memory and Driver memory is set as 4gb which is too high as
> data size is in MB.
>
> Questions ::
>
> 1. Will Spark optimize multiple SQL queries into one single plysical plan ?
> 2. In DAG I can see a lot of file read and lot of stages , Why ? I only
> called action once ?
> 3. Is every SQL will execute and its intermediate result will be stored in
> memory ?
> 4. What is something that causing OOM and GC overhead here ?
> 5. What is optimization that could be taken care of ?
>
> Spark Version 1.5.x
>
>
> Thanks in advance .
> Rabin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


SparkSQL DAG generation , DAG optimization , DAG execution

2016-09-09 Thread Rabin Banerjee
HI All,

 I am writing and executing a Spark Batch program which only use SPARK-SQL
, But it is taking lot of time and finally giving GC overhead .

Here is the program ,

1.Read 3 files ,one medium size and 2 small files, and register them as DF.
2.
 fire sql with complex aggregation and windowing .
 register result as DF.

3.  .Repeat step 2 almost 50 times .so 50 sql .

4. All SQLs are sequential , i.e next step requires prev step result .

5. Finally save the final DF .(This is the only action called).

Note ::

1. I haven't persists the intermediate DF , as I think Spark will optimize
multiple SQL into one physical execution plan .
2. Executor memory and Driver memory is set as 4gb which is too high as
data size is in MB.

Questions ::

1. Will Spark optimize multiple SQL queries into one single plysical plan ?
2. In DAG I can see a lot of file read and lot of stages , Why ? I only
called action once ?
3. Is every SQL will execute and its intermediate result will be stored in
memory ?
4. What is something that causing OOM and GC overhead here ?
5. What is optimization that could be taken care of ?

Spark Version 1.5.x


Thanks in advance .
Rabin


DAG of Spark Sort application spanning two jobs

2016-05-30 Thread alvarobrandon
I've written a very simple Sort scala program with Spark.

/object Sort {

def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: Sort  " +
" []")
System.exit(1)
}


val conf = new SparkConf().setAppName("BigDataBench Sort")
val spark = new SparkContext(conf)
val logger = new JobPropertiesLogger(spark,"/home/abrandon/log.csv")
val filename = args(0)
val save_file = args(1)
var splits = spark.defaultMinPartitions
if (args.length > 2){
splits = args(2).toInt
}
val lines = spark.textFile(filename, splits)
logger.start_timer()
val data_map = lines.map(line => {
(line, 1)
})

val result = data_map.sortByKey().map { line => line._1}
logger.stop_timer()
logger.write_log("Sort By Key: Sort App")
result.saveAsTextFile(save_file)

println("Result has been saved to: " + save_file)
}

}/


Now, I was thinking that since there is only one wide transformation
("sortByKey") two stages will be spanned. However I see two jobs with one
stage in Job 0 and two stages for Job 1. Am I missing something?. What I
don't get is the first stage of the second job. it seems to do the same job
as the stage of Job 0.

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27047/cbKDZ.png> 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27047/GXIkS.png> 
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n27047/H9LXF.png> 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-of-Spark-Sort-application-spanning-two-jobs-tp27047.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DAG Pipelines?

2016-05-04 Thread Cesar Flores
I read on the ml-guide page (
http://spark.apache.org/docs/latest/ml-guide.html#details). It mention that
it is possible to construct DAG Pipelines. Unfortunately there is no
example to explain under which use case this may be useful.

*Can someone give me an example or use case where this functionality may be
useful?*



Thanks
-- 
Cesar Flores


the "DAG Visualiztion" in 1.6 not works fine here

2016-03-15 Thread charles li
sometimes it just shows several *black dots*, and sometimes it can not show
the entire graph.

did anyone meet this before and how did you fix it?




​
​


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


streaming application redundant dag stage execution/performance/caching

2016-02-16 Thread krishna ramachandran
We have a streaming application containing approximately 12 stages every
batch, running in streaming mode (4 sec batches). Each stage persists
output to cassandra

the pipeline stages
stage 1

---> receive Stream A --> map --> filter -> (union with another stream B)
--> map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more stages of transforms and save to database.

Around stage 5, we union the output of Dstream from stage 1 (in red) with
another stream (generated by split during stage 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I
can see this in execution graph & also performance -> processing time).
Processing time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as
2.5 times slower compared to runs without the union - union for most
batches does not alter the original DStream (union with an empty set). If I
cache the DStream (red block output), performance improves substantially
but hit out of memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is
no dstream level "unpersist"

setting "spark.streaming.unpersist" to true and
streamingContext.remember("duration") did not help. Still seeing out of
memory errors

Krishna


RE: Question on Spark architecture and DAG

2016-02-12 Thread Mich Talebzadeh
Thanks Andy much appreciated

 

Mich Talebzadeh

 

LinkedIn
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABU
rV8Pw>
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

 <http://talebzadehmich.wordpress.com/> http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 

From: Andy Davidson [mailto:a...@santacruzintegration.com] 
Sent: 12 February 2016 21:17
To: Mich Talebzadeh ; user@spark.apache.org
Subject: Re: Question on Spark architecture and DAG

 

 

 

From: Mich Talebzadeh mailto:m...@peridale.co.uk> >
Date: Thursday, February 11, 2016 at 2:30 PM
To: "user @spark" mailto:user@spark.apache.org> >
Subject: Question on Spark architecture and DAG

 

Hi,

I have used Hive on Spark engine and of course Hive tables and its pretty
impressive comparing Hive using MR engine.

 

Let us assume that I use spark shell. Spark shell is a client that connects
to spark master running on a host and port like below

spark-shell --master spark://50.140.197.217:7077:

Ok once I connect I create an RDD to read a text file:

val oralog = sc.textFile("/test/alert_mydb.log")

I then search for word Errors in that file

oralog.filter(line => line.contains("Errors")).collect().foreach(line =>
println(line))

 

Questions:

 

1.  In order to display the lines (the result set) containing word
"Errors", the content of the file (i.e. the blocks on HDFS) need to be read
into memory. Is my understanding correct that as per RDD notes those blocks
from the file will be partitioned across the cluster and each node will have
its share of blocks in memory?

 

 

Typically results are written to disk. For example look at
rdd.saveAsTextFile(). You can also use "collect" to copy the RDD data into
the drivers local memory. You need to be careful that all the data will fit
in memory.

 

2.   
3.  Once the result is returned back they need to be sent to the client
that has made the connection to master. I guess this is a simple TCP
operation much like any relational database sending the result back?

 

 

I run several spark streaming apps. One collects data, does some clean up
and publishes the results to down stream systems using activeMQ. Some of our
other apps just write on a socket

 

4.   
5.  Once the results are returned if no request has been made to keep
the data in memory, those blocks in memory will be discarded?

 

There are couple of thing to consider, for example if your batch job
completes all memory is returned. Programaticaly you make RDD persistent or
cause them to be cached in memory

 

6.   
7.  Regardless of the storage block size on disk (128MB, 256MB etc), the
memory pages are 2K in relational databases? Is this the case in Spark as
well?

Thanks,

 Mich Talebzadeh

 

LinkedIn
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABU
rV8Pw>
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

 <http://talebzadehmich.wordpress.com/> http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 



Re: Question on Spark architecture and DAG

2016-02-12 Thread Andy Davidson


From:  Mich Talebzadeh 
Date:  Thursday, February 11, 2016 at 2:30 PM
To:  "user @spark" 
Subject:  Question on Spark architecture and DAG

> Hi,
> 
> I have used Hive on Spark engine and of course Hive tables and its pretty
> impressive comparing Hive using MR engine.
> 
>  
> 
> Let us assume that I use spark shell. Spark shell is a client that connects to
> spark master running on a host and port like below
> 
> spark-shell --master spark://50.140.197.217:7077:
> 
> Ok once I connect I create an RDD to read a text file:
> 
> val oralog = sc.textFile("/test/alert_mydb.log")
> 
> I then search for word Errors in that file
> 
> oralog.filter(line => line.contains("Errors")).collect().foreach(line =>
> println(line))
> 
>  
> 
> Questions:
> 
>  
> 1. In order to display the lines (the result set) containing word "Errors",
> the content of the file (i.e. the blocks on HDFS) need to be read into memory.
> Is my understanding correct that as per RDD notes those blocks from the file
> will be partitioned across the cluster and each node will have its share of
> blocks in memory?


Typically results are written to disk. For example look at
rdd.saveAsTextFile(). You can also use ³collect² to copy the RDD data into
the drivers local memory. You need to be careful that all the data will fit
in memory.

> 1. 
> 2. Once the result is returned back they need to be sent to the client that
> has made the connection to master. I guess this is a simple TCP operation much
> like any relational database sending the result back?


I run several spark streaming apps. One collects data, does some clean up
and publishes the results to down stream systems using activeMQ. Some of our
other apps just write on a socket

> 1. 
> 2. Once the results are returned if no request has been made to keep the data
> in memory, those blocks in memory will be discarded?

There are couple of thing to consider, for example if your batch job
completes all memory is returned. Programaticaly you make RDD persistent or
cause them to be cached in memory

> 1. 
> 2. Regardless of the storage block size on disk (128MB, 256MB etc), the memory
> pages are 2K in relational databases? Is this the case in Spark as well?
> Thanks,
> 
>  Mich Talebzadeh
> 
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV
> 8Pw> 
>  
> http://talebzadehmich.wordpress.com <http://talebzadehmich.wordpress.com/>
>  
> NOTE: The information in this email is proprietary and confidential. This
> message is for the designated recipient only, if you are not the intended
> recipient, you should destroy it immediately. Any information in this message
> shall not be understood as given or endorsed by Peridale Technology Ltd, its
> subsidiaries or their employees, unless expressly so stated. It is the
> responsibility of the recipient to ensure that this email is virus free,
> therefore neither Peridale Technology Ltd, its subsidiaries nor their
> employees accept any responsibility.
>  
>  




Question on Spark architecture and DAG

2016-02-11 Thread Mich Talebzadeh
Hi,

I have used Hive on Spark engine and of course Hive tables and its pretty
impressive comparing Hive using MR engine.

 

Let us assume that I use spark shell. Spark shell is a client that connects
to spark master running on a host and port like below

spark-shell --master spark://50.140.197.217:7077:

Ok once I connect I create an RDD to read a text file:

val oralog = sc.textFile("/test/alert_mydb.log")

I then search for word Errors in that file

oralog.filter(line => line.contains("Errors")).collect().foreach(line =>
println(line))

 

Questions:

 

1.  In order to display the lines (the result set) containing word
"Errors", the content of the file (i.e. the blocks on HDFS) need to be read
into memory. Is my understanding correct that as per RDD notes those blocks
from the file will be partitioned across the cluster and each node will have
its share of blocks in memory?
2.  Once the result is returned back they need to be sent to the client
that has made the connection to master. I guess this is a simple TCP
operation much like any relational database sending the result back?
3.  Once the results are returned if no request has been made to keep
the data in memory, those blocks in memory will be discarded?
4.  Regardless of the storage block size on disk (128MB, 256MB etc), the
memory pages are 2K in relational databases? Is this the case in Spark as
well?

Thanks,

 Mich Talebzadeh

 

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
V8Pw

 

  http://talebzadehmich.wordpress.com

 

NOTE: The information in this email is proprietary and confidential. This
message is for the designated recipient only, if you are not the intended
recipient, you should destroy it immediately. Any information in this
message shall not be understood as given or endorsed by Peridale Technology
Ltd, its subsidiaries or their employees, unless expressly so stated. It is
the responsibility of the recipient to ensure that this email is virus free,
therefore neither Peridale Technology Ltd, its subsidiaries nor their
employees accept any responsibility.

 

 



Is there some open source tools which implements draggable widget and make the app runing in a form of DAG ?

2016-02-01 Thread zml张明磊
Hello ,

  I am trying to find some tools but useless. So, as title described, Is 
there some open source tools which implements draggable widget and make the app 
running in a form of DAG like workflow ?

Thanks,
Minglei.


DAG visualization: no visualization information available with history server

2016-01-31 Thread Raghava
Hello All,

I am running the history server for a completed application. This
application was run with the following parameters

bin/spark-submit --class  --master local[2] --conf
spark.local.dir=/mnt/ --conf spark.eventLog.dir=/mnt/sparklog/ --conf
spark.eventLog.enabled=true --conf spark.ui.retainedJobs=1 --conf
spark.ui.retainedStages=1 

In the Spark Web UI (http://localhost:18080/), the DAG visualization of only
the most recent job is available. For rest of the jobs, I get the following
message 

No visualization information available for this job!
If this is an old job, its visualization metadata may have been cleaned up
over time.
You may consider increasing the value of spark.ui.retainedJobs and
spark.ui.retainedStages.

I did increase the retainedJobs and retainedStages value to 10,000. What
else should be done to retain the visualization information of all the
jobs/stages?

Thanks in advance.

Raghava.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-visualization-no-visualization-information-available-with-history-server-tp26117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-25 Thread Tathagata Das
First of all, if you are running batches of 15 minutes, and you dont need
second level latencies, it might be just easier to run batch jobs in a for
loop - you will have greater control over what is going on.
And if you are using reduceByKeyAndWindow without the
inverseReduceFunction, then Spark has to read all the 2 hours of data back
after recovering the driver from checkpoint files if it has to calculate
aggregations on last two hours of data (the data read earlier is lost when
the driver is dead).

On Sat, Jan 23, 2016 at 1:47 PM, gaurav sharma 
wrote:

> Hi Tathagata/Cody,
>
> I am facing a challenge in Production with DAG behaviour during
> checkpointing in spark streaming -
>
> Step 1 : Read data from Kafka every 15 min -  call this KafkaStreamRDD ~
> 100 GB of data
>
> Step 2 : Repartition KafkaStreamRdd from 5 to 100 partitions to
> parallelise processing - call this RepartitionedKafkaStreamRdd
>
> Step 3 : on this RepartitionedKafkaStreamRdd I run map and
> reduceByKeyAndWindow over a window of 2 hours, call this RDD1 ~ 100 MB of
> data
>
> Checkpointing is enabled.
>
> If i restart my streaming context, it picks up from last checkpointed
> state,
>
> READS data for all the 8 SUCCESSFULLY FINISHED 15 minute batches from
> Kafka , re-performs Repartition of all the data of all these 8 , 15 minute
> batches.
>
> Then reads data for current 15 minute batch and runs map and
> reduceByKeyAndWindow over a window of 2 hours.
>
> Challenge -
> 1> I cant checkpoint KafkaStreamRDD or RepartitionedKafkaStreamRdd as this
> is huge data around 800GB for 2 hours, reading and writing (checkpointing)
> this at every 15 minutes would be very slow.
>
> 2> When i have checkpointed data of RDD1 at every 15 minutes, and map and
> reduceByKeyAndWindow is being run over RDD1 only, and i have snapshot of
> all of the last 8, 15 minute batches of RDD1,
> why is spark reading all the data for last 8 successfully completed
> batches from Kafka again(Step 1) and again performing re-partitioning(Step
> 2) and then again running map and reduceByKeyandWindow over these newly
> fetched kafkaStreamRdd data of last 8 , 15 minute batches.
>
> Because of above mentioned challenges, i am not able to exploit
> checkpointing, in case streaming context is restarted at high load.
>
> Please help out in understanding, if there is something that i am missing
>
> Regards,
> Gaurav
>


Re: Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-25 Thread Cody Koeninger
Where are you calling checkpointing? Metadata checkpointing for a kafa
direct stream should just be the offsets, not the data.

TD can better speak to reduceByKeyAndWindow behavior when restoring from a
checkpoint, but ultimately the only available choices would be replay the
prior window data from kafka; replay the prior window data from checkpoint
/ other storage (not much reason for this, since it's stored in kafka); or
lose the prior window data.



On Sat, Jan 23, 2016 at 3:47 PM, gaurav sharma 
wrote:

> Hi Tathagata/Cody,
>
> I am facing a challenge in Production with DAG behaviour during
> checkpointing in spark streaming -
>
> Step 1 : Read data from Kafka every 15 min -  call this KafkaStreamRDD ~
> 100 GB of data
>
> Step 2 : Repartition KafkaStreamRdd from 5 to 100 partitions to
> parallelise processing - call this RepartitionedKafkaStreamRdd
>
> Step 3 : on this RepartitionedKafkaStreamRdd I run map and
> reduceByKeyAndWindow over a window of 2 hours, call this RDD1 ~ 100 MB of
> data
>
> Checkpointing is enabled.
>
> If i restart my streaming context, it picks up from last checkpointed
> state,
>
> READS data for all the 8 SUCCESSFULLY FINISHED 15 minute batches from
> Kafka , re-performs Repartition of all the data of all these 8 , 15 minute
> batches.
>
> Then reads data for current 15 minute batch and runs map and
> reduceByKeyAndWindow over a window of 2 hours.
>
> Challenge -
> 1> I cant checkpoint KafkaStreamRDD or RepartitionedKafkaStreamRdd as this
> is huge data around 800GB for 2 hours, reading and writing (checkpointing)
> this at every 15 minutes would be very slow.
>
> 2> When i have checkpointed data of RDD1 at every 15 minutes, and map and
> reduceByKeyAndWindow is being run over RDD1 only, and i have snapshot of
> all of the last 8, 15 minute batches of RDD1,
> why is spark reading all the data for last 8 successfully completed
> batches from Kafka again(Step 1) and again performing re-partitioning(Step
> 2) and then again running map and reduceByKeyandWindow over these newly
> fetched kafkaStreamRdd data of last 8 , 15 minute batches.
>
> Because of above mentioned challenges, i am not able to exploit
> checkpointing, in case streaming context is restarted at high load.
>
> Please help out in understanding, if there is something that i am missing
>
> Regards,
> Gaurav
>


Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-23 Thread gaurav sharma
Hi Tathagata/Cody,

I am facing a challenge in Production with DAG behaviour during
checkpointing in spark streaming -

Step 1 : Read data from Kafka every 15 min -  call this KafkaStreamRDD ~
100 GB of data

Step 2 : Repartition KafkaStreamRdd from 5 to 100 partitions to parallelise
processing - call this RepartitionedKafkaStreamRdd

Step 3 : on this RepartitionedKafkaStreamRdd I run map and
reduceByKeyAndWindow over a window of 2 hours, call this RDD1 ~ 100 MB of
data

Checkpointing is enabled.

If i restart my streaming context, it picks up from last checkpointed
state,

READS data for all the 8 SUCCESSFULLY FINISHED 15 minute batches from Kafka
, re-performs Repartition of all the data of all these 8 , 15 minute
batches.

Then reads data for current 15 minute batch and runs map and
reduceByKeyAndWindow over a window of 2 hours.

Challenge -
1> I cant checkpoint KafkaStreamRDD or RepartitionedKafkaStreamRdd as this
is huge data around 800GB for 2 hours, reading and writing (checkpointing)
this at every 15 minutes would be very slow.

2> When i have checkpointed data of RDD1 at every 15 minutes, and map and
reduceByKeyAndWindow is being run over RDD1 only, and i have snapshot of
all of the last 8, 15 minute batches of RDD1,
why is spark reading all the data for last 8 successfully completed batches
from Kafka again(Step 1) and again performing re-partitioning(Step 2) and
then again running map and reduceByKeyandWindow over these newly fetched
kafkaStreamRdd data of last 8 , 15 minute batches.

Because of above mentioned challenges, i am not able to exploit
checkpointing, in case streaming context is restarted at high load.

Please help out in understanding, if there is something that i am missing

Regards,
Gaurav


Re: Dynamic DAG use-case for spark streaming.

2015-09-29 Thread Tathagata Das
A very basic support that is there in DStream is DStream.transform() which
take arbitrary RDD => RDD function. This function can actually choose to do
different computation with time. That may be of help to you.

On Tue, Sep 29, 2015 at 12:06 PM, Archit Thakur 
wrote:

> Hi,
>
>  We are using spark streaming as our processing engine, and as part of
> output we want to push the data to UI. Now there would be multiple users
> accessing the system with there different filters on. Based on the filters
> and other inputs we want to either run a SQL Query on DStream or do a
> custom logic processing. This would need the system to read the
> filters/query and generate the execution graph at runtime. I cant see any
> support in spark streaming for generating the execution graph on the fly.
> I think I can broadcast the query to executors and read the broadcasted
> query at runtime but that would also limit my user to 1 at a time.
>
> Do we not expect the spark streaming to take queries/filters from outside
> world. Does output in spark streaming only means outputting to an external
> source which could then be queried.
>
> Thanks,
> Archit Thakur.
>


Dynamic DAG use-case for spark streaming.

2015-09-29 Thread Archit Thakur
Hi,

 We are using spark streaming as our processing engine, and as part of
output we want to push the data to UI. Now there would be multiple users
accessing the system with there different filters on. Based on the filters
and other inputs we want to either run a SQL Query on DStream or do a
custom logic processing. This would need the system to read the
filters/query and generate the execution graph at runtime. I cant see any
support in spark streaming for generating the execution graph on the fly.
I think I can broadcast the query to executors and read the broadcasted
query at runtime but that would also limit my user to 1 at a time.

Do we not expect the spark streaming to take queries/filters from outside
world. Does output in spark streaming only means outputting to an external
source which could then be queried.

Thanks,
Archit Thakur.


Re: DAG Scheduler deadlock when two RDDs reference each other, force Stages manually?

2015-09-14 Thread Petros Nyfantis
) + A(1) + A(2)
B(2) = A(0)
}

To do such a computation in spark, I used
A = A.map( (key,val) => { B.aggregate(...) })
B = B.map( (key,val) => { A.aggregate(...) })

where if the key of each mapped element keyA is passed in the aggregate
function as a
initialization parameter and then for each B element key keyB, if M(keyA,
keyB) ==1
then the B element is being taken into account in the summation.

The calculation of A is done successfully and correctly, but then the DAG
scheduler
seems to deadlock when the calculation of B happens. This behaviour goes
away
when I remove the A.aggregate bit in my code. Apparently according to the
logs the
scheduler is expecting some results before if can go on but the results
should already
have been calculated.

I assume that this has to do with the DAG scheduling not handling cyclical
dependencies.
Is there a way I can force each iteration or update of A and B to be seen as
a separate
stage? Otherwise, how can I implement this type of aggregation in another
way? (It could
be the equivalent of mapping the A elements to a List of all the B elements
for which the M
matrix entry is 1 and then mapping again to their sum, but this means I need
a lot of space
especially when the problem in hand could be very large, which is
unfeasible, so I need to avoid this)

Thanks in advance for your help!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DAG Scheduler deadlock when two RDDs reference each other, force Stages manually?

2015-09-14 Thread Sean Owen
There isn't a cycle in your graph, since although you reuse reference
variables in your code called A and B you are in fact creating new
RDDs at each operation. You have some other problem, and you'd have to
provide detail on why you think something is deadlocked, like a thread
dump.

On Mon, Sep 14, 2015 at 10:42 AM, petranidis  wrote:
> Hi all,
>
> I am new to spark and I have writen a few spark programs mostly around
> machine learning
> applications.
>
> I am trying to resolve a particular problem where there are two RDDs that
> should be updated
> by using elements of each other. More specifically, if the two pair RDDs are
> called A and B M
> is a matrix that specifies which elements of each RDD should be taken into
> account when
> computing the other with rows of M corresponding to elements of A and
> columns to elements
> of B e.g.
>
> A = (0, 6), (1,7), (2,8)
> B = (0, 4), (1,6), (2,1)
> and
> M =
>  0 1 1
>  1 1 0
> 0 1 0
>
> Then
>
> for (it =0;it < 10; it++) {
> A(0) = B(1) + B(2)
> A(1) = B(0) + B(1)
> A(2) = B(1)
> B(0) = A(1)
> B(1) = A(0) + A(1) + A(2)
> B(2) = A(0)
> }
>
> To do such a computation in spark, I used
> A = A.map( (key,val) => { B.aggregate(...) })
> B = B.map( (key,val) => { A.aggregate(...) })
>
> where if the key of each mapped element keyA is passed in the aggregate
> function as a
> initialization parameter and then for each B element key keyB, if M(keyA,
> keyB) ==1
> then the B element is being taken into account in the summation.
>
> The calculation of A is done successfully and correctly, but then the DAG
> scheduler
> seems to deadlock when the calculation of B happens. This behaviour goes
> away
> when I remove the A.aggregate bit in my code. Apparently according to the
> logs the
> scheduler is expecting some results before if can go on but the results
> should already
> have been calculated.
>
> I assume that this has to do with the DAG scheduling not handling cyclical
> dependencies.
> Is there a way I can force each iteration or update of A and B to be seen as
> a separate
> stage? Otherwise, how can I implement this type of aggregation in another
> way? (It could
> be the equivalent of mapping the A elements to a List of all the B elements
> for which the M
> matrix entry is 1 and then mapping again to their sum, but this means I need
> a lot of space
> especially when the problem in hand could be very large, which is
> unfeasible, so I need to avoid this)
>
> Thanks in advance for your help!
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DAG Scheduler deadlock when two RDDs reference each other, force Stages manually?

2015-09-14 Thread petranidis
Hi all, 

I am new to spark and I have writen a few spark programs mostly around
machine learning 
applications.

I am trying to resolve a particular problem where there are two RDDs that
should be updated
by using elements of each other. More specifically, if the two pair RDDs are
called A and B M
is a matrix that specifies which elements of each RDD should be taken into
account when
computing the other with rows of M corresponding to elements of A and
columns to elements
of B e.g.

A = (0, 6), (1,7), (2,8)
B = (0, 4), (1,6), (2,1)
and
M = 
 0 1 1
 1 1 0 
0 1 0

Then

for (it =0;it < 10; it++) {
A(0) = B(1) + B(2)
A(1) = B(0) + B(1)
A(2) = B(1)
B(0) = A(1)
B(1) = A(0) + A(1) + A(2)
B(2) = A(0)
}

To do such a computation in spark, I used
A = A.map( (key,val) => { B.aggregate(...) }) 
B = B.map( (key,val) => { A.aggregate(...) }) 

where if the key of each mapped element keyA is passed in the aggregate
function as a 
initialization parameter and then for each B element key keyB, if M(keyA,
keyB) ==1
then the B element is being taken into account in the summation.

The calculation of A is done successfully and correctly, but then the DAG
scheduler
seems to deadlock when the calculation of B happens. This behaviour goes
away
when I remove the A.aggregate bit in my code. Apparently according to the
logs the 
scheduler is expecting some results before if can go on but the results
should already
have been calculated.

I assume that this has to do with the DAG scheduling not handling cyclical
dependencies.
Is there a way I can force each iteration or update of A and B to be seen as
a separate
stage? Otherwise, how can I implement this type of aggregation in another
way? (It could
be the equivalent of mapping the A elements to a List of all the B elements
for which the M
matrix entry is 1 and then mapping again to their sum, but this means I need
a lot of space 
especially when the problem in hand could be very large, which is
unfeasible, so I need to avoid this)

Thanks in advance for your help!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to create combine DAG visualization?

2015-09-10 Thread b.bhavesh
Hi,

How can I create combine DAG visualization of pyspark code instead of
separate DAGs of jobs and stages?

Thanks 
b.bhavesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-combine-DAG-visualization-tp24653.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DAG related query

2015-08-20 Thread Andrew Or
Hi Bahubali,

Once RDDs are created, they are immutable (in most cases). In your case you
end up with 3 RDDs:

(1) the original rdd1 that reads from the text file
(2) rdd2, that applies a map function on (1), and
(3) the new rdd1 that applies a map function on (2)

There's no cycle because you have 3 distinct RDDs. All you're doing is
reassigning a reference `rdd1`, but the underlying RDD doesn't change.

-Andrew

2015-08-20 6:21 GMT-07:00 Sean Owen :

> No. The third line creates a third RDD whose reference simply replaces
> the reference to the first RDD in your local driver program. The first
> RDD still exists.
>
> On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain  wrote:
> > Hi,
> > How would the DAG look like for the below code
> >
> > JavaRDD rdd1 = context.textFile();
> > JavaRDD rdd2 = rdd1.map();
> > rdd1 =  rdd2.map();
> >
> > Does this lead to any kind of cycle?
> >
> > Thanks,
> > Baahu
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DAG related query

2015-08-20 Thread Sean Owen
No. The third line creates a third RDD whose reference simply replaces
the reference to the first RDD in your local driver program. The first
RDD still exists.

On Thu, Aug 20, 2015 at 2:15 PM, Bahubali Jain  wrote:
> Hi,
> How would the DAG look like for the below code
>
> JavaRDD rdd1 = context.textFile();
> JavaRDD rdd2 = rdd1.map();
> rdd1 =  rdd2.map();
>
> Does this lead to any kind of cycle?
>
> Thanks,
> Baahu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DAG related query

2015-08-20 Thread Bahubali Jain
Hi,
How would the DAG look like for the below code

JavaRDD rdd1 = context.textFile();
JavaRDD rdd2 = rdd1.map();
rdd1 =  rdd2.map();

Does this lead to any kind of cycle?

Thanks,
Baahu


Re: DataFrame DAG recomputed even though DataFrame is cached?

2015-07-28 Thread Michael Armbrust
We will try to address this before Spark 1.5 is released:
https://issues.apache.org/jira/browse/SPARK-9141

On Tue, Jul 28, 2015 at 11:50 AM, Kristina Rogale Plazonic  wrote:

> Hi,
>
> I'm puzzling over the following problem: when I cache a small sample of a
> big dataframe, the small dataframe is recomputed when selecting a column
> (but not if show() or count() is invoked).
>
> Why is that so and how can I avoid recomputation of the small sample
> dataframe?
>
> More details:
>
> - I have a big dataframe "df" of ~190million rows and ~10 columns,
> obtained via 3 different joins; I cache it and invoke count() to make sure
> it really is in memory and confirm in web UI
>
> - val sdf = df.sample(false, 1e-6); sdf.cache(); sdf.count()  // 170
> lines; cached is also confirmed in webUI, size in memory is 150kB
>
> *- sdf.select("colname").show()   // this triggers a complete
> recomputation of sdf with 3 joins!*
>
> - show(), count() or take() do not trigger the recomputation of the 3
> joins, but select(), collect() or withColumn() do.
>
> I have --executor-memory 30G --driver-memory 10g, so memory is not a
> problem. I'm using Spark 1.4.0. Could anybody shed some light on this or
> where I can find more info?
>
> Many thanks,
> Kristina
>


DataFrame DAG recomputed even though DataFrame is cached?

2015-07-28 Thread Kristina Rogale Plazonic
Hi,

I'm puzzling over the following problem: when I cache a small sample of a
big dataframe, the small dataframe is recomputed when selecting a column
(but not if show() or count() is invoked).

Why is that so and how can I avoid recomputation of the small sample
dataframe?

More details:

- I have a big dataframe "df" of ~190million rows and ~10 columns, obtained
via 3 different joins; I cache it and invoke count() to make sure it really
is in memory and confirm in web UI

- val sdf = df.sample(false, 1e-6); sdf.cache(); sdf.count()  // 170 lines;
cached is also confirmed in webUI, size in memory is 150kB

*- sdf.select("colname").show()   // this triggers a complete recomputation
of sdf with 3 joins!*

- show(), count() or take() do not trigger the recomputation of the 3
joins, but select(), collect() or withColumn() do.

I have --executor-memory 30G --driver-memory 10g, so memory is not a
problem. I'm using Spark 1.4.0. Could anybody shed some light on this or
where I can find more info?

Many thanks,
Kristina


Building DAG from log

2015-05-04 Thread Giovanni Paolo Gibilisco
Hi,
I'm trying to build the DAG of an application from the logs.
I've had a look at SparkReplayDebugger but it doesn't operato offline on
logs. I looked also at the one in this pull:
https://github.com/apache/spark/pull/2077 that seems to operate only on
logs but it doesn't clealry show the dependency between the stages.
Is there some other tool to do this?

In the log files I could not find the information needed to define
dependencies within the stages, is there any other way to derive this
information offline?
Thanks,


Re: DAG

2015-04-25 Thread Corey Nolet
Giovanni,

The DAG can be walked by calling the "dependencies()" function on any RDD.
It returns a  Seq containing the parent RDDs. If you start at the leaves
and walk through the parents until dependencies() returns an empty Seq, you
ultimately have your DAG.

On Sat, Apr 25, 2015 at 1:28 PM, Akhil Das 
wrote:

> May be this will give you a good start
> https://github.com/apache/spark/pull/2077
>
> Thanks
> Best Regards
>
> On Sat, Apr 25, 2015 at 1:29 AM, Giovanni Paolo Gibilisco <
> gibb...@gmail.com> wrote:
>
>> Hi,
>> I would like to know if it is possible to build the DAG before actually
>> executing the application. My guess is that in the scheduler the DAG is
>> built dynamically at runtime since it might depend on the data, but I was
>> wondering if there is a way (and maybe a tool already) to analyze the code
>> and buidl the DAG.
>>
>> Thank you!
>>
>
>


Re: DAG

2015-04-25 Thread Akhil Das
May be this will give you a good start
https://github.com/apache/spark/pull/2077

Thanks
Best Regards

On Sat, Apr 25, 2015 at 1:29 AM, Giovanni Paolo Gibilisco  wrote:

> Hi,
> I would like to know if it is possible to build the DAG before actually
> executing the application. My guess is that in the scheduler the DAG is
> built dynamically at runtime since it might depend on the data, but I was
> wondering if there is a way (and maybe a tool already) to analyze the code
> and buidl the DAG.
>
> Thank you!
>


DAG

2015-04-24 Thread Giovanni Paolo Gibilisco
Hi,
I would like to know if it is possible to build the DAG before actually
executing the application. My guess is that in the scheduler the DAG is
built dynamically at runtime since it might depend on the data, but I was
wondering if there is a way (and maybe a tool already) to analyze the code
and buidl the DAG.

Thank you!


Re: Spark Application Stages and DAG

2015-04-07 Thread Vijay Innamuri
My Spark streaming application processes the data received in each interval.

In Spark Stages UI, all the stages are pointed to single line of code*
windowDStream.foreachRDD* only (not the actions inside the DStream)


   - Following is the information from Spark Stages UI page:


Stage IdDescription
Submitted   Duration   Tasks: Succeeded/TotalInput
OutputShuffle ReadShuffle Write
2foreachRDD at Parser.scala:58 +details 06-04-2015 16:21
19 min3125/3125 (43 failed) 154.4 MB23.9 MB
1foreachRDD at Parser.scala:58 +details 06-04-2015 16:19
2.3 min3125/3125 149.7 MB
0foreachRDD at Parser.scala:58 +details 06-04-2015 16:16
3.0 min3125/3125 149.7 MB


   - Following is the code snippet at Parser.scala:58:

val windowDStream = ssc.fileStream[LongWritable, Text,
CustomInputFormat](args(0), (x : Path) => true, false)
*windowDStream.foreachRDD *{ IncomingFiles =>

println("Interval data processing
"+Calendar.getInstance().getTime());
if (IncomingFiles.count() == 0) {
println("No files received in this interval")
} else {
println(IncomingFiles.count()+" files received in this
interval");
//convert each xml text to RDD[Elem]
val inputRDD = IncomingFiles.map(eachXML => {
MyXML.loadString(eachXML._2.toString().trim().replaceFirst("^([\\W]+)<",
"<")) });
//Create a schema RDD for querying the data
val MySchemaRDD = inputRDD.map(x => {

Bd((x \\ "Oied" \\ "oeuo").text, List("placeholder1",
"placeholder2", "placeholder3"))
//Bd is a case class - case class Bd(oeuo : String, mi
: List[String])
})
// Save the file for debuging
MySchemaRDD.saveAsTextFile("/home/spark/output/result.txt")
//Spark SQL processing starts from here
MySchemaRDD.registerTempTable("MySchemaTable")
//Todo processing with Sparl-SQL
MySchemaRDD.printSchema()

println("end of processing");

}
}

Spark UI Details for Stage 2
http://pastebin.com/c2QYeSJj

I have tested this with 150 MB of input data.
All the Spark memory options as default and with executor Memory 512.0 MB.


   - Is it possible to see the stages information within the *windowDStream*
   operation (which action inside the Dstream processing)?


   - During Stage 2 executor had restarted many times due to
   OutOfMemoryError. is this an expected behavior? (Please find the stage 2
   details)


Regards
Vijay

On 3 April 2015 at 13:21, Tathagata Das  wrote:

> What he meant is that look it up in the Spark UI, specifically in the
> Stage tab to see what is taking so long. And yes code snippet helps us
> debug.
>
> TD
>
> On Fri, Apr 3, 2015 at 12:47 AM, Akhil Das 
> wrote:
>
>> You need open the Stage\'s page which is taking time, and see how long
>> its spending on GC etc. Also it will be good to post that Stage and its
>> previous transformation's code snippet to make us understand it better.
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri 
>> wrote:
>>
>>>
>>> When I run the Spark application (streaming) in local mode I could see
>>> the execution progress as below..
>>>
>>> [Stage
>>> 0:>
>>> (1817 + 1) / 3125]
>>> 
>>> [Stage
>>> 2:===>
>>> (740 + 1) / 3125]
>>>
>>> One of the stages is taking long time for execution.
>>>
>>> How to find the transformations/ actions associated with a particular
>>> stage?
>>> Is there anyway to find the execution DAG of a Spark Application?
>>>
>>> Regards
>>> Vijay
>>>
>>
>>
>


Re: Spark Application Stages and DAG

2015-04-03 Thread Tathagata Das
What he meant is that look it up in the Spark UI, specifically in the Stage
tab to see what is taking so long. And yes code snippet helps us debug.

TD

On Fri, Apr 3, 2015 at 12:47 AM, Akhil Das 
wrote:

> You need open the Stage\'s page which is taking time, and see how long its
> spending on GC etc. Also it will be good to post that Stage and its
> previous transformation's code snippet to make us understand it better.
>
> Thanks
> Best Regards
>
> On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri 
> wrote:
>
>>
>> When I run the Spark application (streaming) in local mode I could see
>> the execution progress as below..
>>
>> [Stage
>> 0:>
>> (1817 + 1) / 3125]
>> 
>> [Stage
>> 2:===>
>> (740 + 1) / 3125]
>>
>> One of the stages is taking long time for execution.
>>
>> How to find the transformations/ actions associated with a particular
>> stage?
>> Is there anyway to find the execution DAG of a Spark Application?
>>
>> Regards
>> Vijay
>>
>
>


Re: Spark Application Stages and DAG

2015-04-03 Thread Akhil Das
You need open the Stage\'s page which is taking time, and see how long its
spending on GC etc. Also it will be good to post that Stage and its
previous transformation's code snippet to make us understand it better.

Thanks
Best Regards

On Fri, Apr 3, 2015 at 1:05 PM, Vijay Innamuri 
wrote:

>
> When I run the Spark application (streaming) in local mode I could see the
> execution progress as below..
>
> [Stage
> 0:>
> (1817 + 1) / 3125]
> 
> [Stage
> 2:===>
> (740 + 1) / 3125]
>
> One of the stages is taking long time for execution.
>
> How to find the transformations/ actions associated with a particular
> stage?
> Is there anyway to find the execution DAG of a Spark Application?
>
> Regards
> Vijay
>


Spark Application Stages and DAG

2015-04-03 Thread Vijay Innamuri
When I run the Spark application (streaming) in local mode I could see the
execution progress as below..

[Stage
0:>
(1817 + 1) / 3125]

[Stage
2:===>
(740 + 1) / 3125]

One of the stages is taking long time for execution.

How to find the transformations/ actions associated with a particular stage?
Is there anyway to find the execution DAG of a Spark Application?

Regards
Vijay


Support for Data flow graphs and not DAG only

2015-04-02 Thread anshu shukla
Hey ,

I  didn't  find any documentation  regarding support for  cycles in spark
topology , although storm supports  this using manual  configuration in
acker function logic (setting it to a particular count) .By cycles  i
 doesn't mean infinite loops .

-- 
Thanks & Regards,
Anshu Shukla


question regarding the dependency DAG in Spark

2015-03-16 Thread Grandl Robert
Hi guys,

I am trying to get a better understanding of the DAG generation for a job in 
Spark. 

Ideally, what I want is to run some SQL query and extract the generated DAG by 
Spark. By DAG I mean the stages and dependencies among stages, and the number 
of tasks in every stage.

Could you guys point me to the code where is that happening ?

Thank you,
Robert



Re: Visualizing the DAG of a Spark application

2015-03-13 Thread t1ny
For anybody who's interested in this, here's a link to a PR that addresses
this feature :
https://github.com/apache/spark/pull/2077

(thanks to Todd Nist for sending it to me)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-the-DAG-of-a-Spark-application-tp22033p22037.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Visualizing the DAG of a Spark application

2015-03-13 Thread Todd Nist
There is the PR https://github.com/apache/spark/pull/2077 for doing this.

On Fri, Mar 13, 2015 at 6:42 AM, t1ny  wrote:

> Hi all,
>
> We are looking for a tool that would let us visualize the DAG generated by
> a
> Spark application as a simple graph.
> This graph would represent the Spark Job, its stages and the tasks inside
> the stages, with the dependencies between them (either narrow or shuffle
> dependencies).
>
> The Spark Replay Debugger (
> http://spark-replay-debugger-overview.readthedocs.org/en/latest
> <http://spark-replay-debugger-overview.readthedocs.org/en/latest/>  /)
> seemed like a perfect tool for this, unfortunately it doesn't seem to be
> maintained anymore.
>
> What is the state of the Spark Replay Debugger ? Has this project been
> "dropped" or has it in fact been ported to the latest versions of Spark ?
> Is there any other similar tool out there that we could use ?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-the-DAG-of-a-Spark-application-tp22033.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Visualizing the DAG of a Spark application

2015-03-13 Thread t1ny
Hi all,

We are looking for a tool that would let us visualize the DAG generated by a
Spark application as a simple graph. 
This graph would represent the Spark Job, its stages and the tasks inside
the stages, with the dependencies between them (either narrow or shuffle
dependencies).

The Spark Replay Debugger (
http://spark-replay-debugger-overview.readthedocs.org/en/latest
<http://spark-replay-debugger-overview.readthedocs.org/en/latest/>  /)
seemed like a perfect tool for this, unfortunately it doesn't seem to be
maintained anymore.

What is the state of the Spark Replay Debugger ? Has this project been
"dropped" or has it in fact been ported to the latest versions of Spark ? 
Is there any other similar tool out there that we could use ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-the-DAG-of-a-Spark-application-tp22033.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-19 Thread Tobias Pfeiffer
Hi,

On Sat, Jan 17, 2015 at 3:37 AM, Peng Cheng  wrote:

> I'm talking about RDD1 (not persisted or checkpointed) in this situation:
>
> ...(somewhere) -> RDD1 -> RDD2
>   ||
>  V   V
>  RDD3 -> RDD4 -> Action!
>
> To my experience the change RDD1 get recalculated is volatile, sometimes
> once, sometimes twice.


That should not happen if your access pattern to RDD2 and RDD3 is always
the same.

A related problem might be in $SQLContest.jsonRDD(), since the source
> jsonRDD is used twice (one for schema inferring, another for data read). It
> almost guarantees that the source jsonRDD is calculated twice. Has this
> problem be addressed so far?
>

That's exactly why schema inference is expensive. However, I am afraid in
general you have to make a decision between "store" or "recompute" (cf.
http://en.wikipedia.org/wiki/Space%E2%80%93time_tradeoff). There is no way
to avoid recomputation on each access except than storing the value, I
guess.

Tobias


Re: If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-19 Thread Xuefeng Wu
I think it's always twice,  could you provide some demo case for sometimes
the RDD1 calculated only once?

On Sat, Jan 17, 2015 at 2:37 AM, Peng Cheng  wrote:

> I'm talking about RDD1 (not persisted or checkpointed) in this situation:
>
> ...(somewhere) -> RDD1 -> RDD2
>   ||
>  V   V
>  RDD3 -> RDD4 -> Action!
>
> To my experience the change RDD1 get recalculated is volatile, sometimes
> once, sometimes twice. When calculation of this RDD is expensive (e.g.
> involves using an RESTful service that charges me money), this compels me
> to
> persist RDD1 which takes extra memory, and in case the Action! doesn't
> always happen, I don't know when to unpersist it to  free those memory.
>
> A related problem might be in $SQLContest.jsonRDD(), since the source
> jsonRDD is used twice (one for schema inferring, another for data read). It
> almost guarantees that the source jsonRDD is calculated twice. Has this
> problem be addressed so far?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/If-an-RDD-appeared-twice-in-a-DAG-of-which-calculation-is-triggered-by-a-single-action-will-this-RDD-tp21192.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

~Yours, Xuefeng Wu/吴雪峰  敬上


If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) -> RDD1 -> RDD2
  ||
 V   V
 RDD3 -> RDD4 -> Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me
to persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Is there a
way to solve (or circumvent) this problem?


If an RDD appeared twice in a DAG, of which calculation is triggered by a single action, will this RDD be calculated twice?

2015-01-16 Thread Peng Cheng
I'm talking about RDD1 (not persisted or checkpointed) in this situation:

...(somewhere) -> RDD1 -> RDD2
  ||
 V   V
 RDD3 -> RDD4 -> Action!

To my experience the change RDD1 get recalculated is volatile, sometimes
once, sometimes twice. When calculation of this RDD is expensive (e.g.
involves using an RESTful service that charges me money), this compels me to
persist RDD1 which takes extra memory, and in case the Action! doesn't
always happen, I don't know when to unpersist it to  free those memory.

A related problem might be in $SQLContest.jsonRDD(), since the source
jsonRDD is used twice (one for schema inferring, another for data read). It
almost guarantees that the source jsonRDD is calculated twice. Has this
problem be addressed so far?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/If-an-RDD-appeared-twice-in-a-DAG-of-which-calculation-is-triggered-by-a-single-action-will-this-RDD-tp21192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Strange DAG scheduling behavior on currently dependent RDDs

2015-01-07 Thread Corey Nolet
I asked this question too soon. I am caching off a bunch of RDDs in a
TrieMap so that our framework can wire them together and the locking was
not completely correct- therefore it was creating multiple new RDDs at
times instead of using cached versions- which were creating completely
separate lineages.

What's strange is that this bug only surfaced when I updated Spark.

On Wed, Jan 7, 2015 at 9:12 AM, Corey Nolet  wrote:

> We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework
> that we've been developing that connects various different RDDs together
> based on some predefined business cases. After updating to 1.2.0, some of
> the concurrency expectations about how the stages within jobs are executed
> have changed quite significantly.
>
> Given 3 RDDs:
>
> RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache()
> RDD2 = RDD1.outputToFile
> RDD3 = RDD1.groupBy().outputToFile
>
> In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage
> encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and
> RDD3 to both block waiting for RDD1 to complete and cache- at which point
> RDD2 and RDD3 both use the cached version to complete their work.
>
> Spark 1.2.0 seems to schedule two (be it concurrently running) stages for
> each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each
> get run twice). It does not look like there is any sharing of the results
> between these jobs.
>
> Are we doing something wrong? Is there a setting that I'm not
> understanding somewhere?
>


Strange DAG scheduling behavior on currently dependent RDDs

2015-01-07 Thread Corey Nolet
We just updated to Spark 1.2.0 from Spark 1.1.0. We have a small framework
that we've been developing that connects various different RDDs together
based on some predefined business cases. After updating to 1.2.0, some of
the concurrency expectations about how the stages within jobs are executed
have changed quite significantly.

Given 3 RDDs:

RDD1 = inputFromHDFS().groupBy().sortBy().etc().cache()
RDD2 = RDD1.outputToFile
RDD3 = RDD1.groupBy().outputToFile

In Spark 1.1.0, we expected RDD1 to be scheduled based on the first stage
encountered (RDD2's outputToFile or RDD3's groupBy()) and then for RDD2 and
RDD3 to both block waiting for RDD1 to complete and cache- at which point
RDD2 and RDD3 both use the cached version to complete their work.

Spark 1.2.0 seems to schedule two (be it concurrently running) stages for
each of RDD1's stages (inputFromHDFS, groupBy(), sortBy(), etc() will each
get run twice). It does not look like there is any sharing of the results
between these jobs.

Are we doing something wrong? Is there a setting that I'm not understanding
somewhere?


Re: DAG info

2015-01-03 Thread madhu phatak
Hi,
You can turn off these messages using log4j.properties.

On Fri, Jan 2, 2015 at 1:51 PM, Robineast  wrote:

> Do you have some example code of what you are trying to do?
>
> Robin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DAG-info-tp20940p20941.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Re: DAG info

2015-01-02 Thread Robineast
Do you have some example code of what you are trying to do?

Robin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-info-tp20940p20941.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DAG info

2015-01-01 Thread Josh Rosen
This log message is normal; in this case, this message is saying that the
final stage needed to compute your job does not have any dependencies /
parent stages and that there are no parent stages that need to be computed.

On Thu, Jan 1, 2015 at 11:02 PM, shahid  wrote:

> hi guys
>
>
> i have just starting using spark, i am getting this as an info
> 15/01/02 11:54:17 INFO DAGScheduler: Parents of final stage: List()
> 15/01/02 11:54:17 INFO DAGScheduler: Missing parents: List()
> 15/01/02 11:54:17 INFO DAGScheduler: Submitting Stage 6 (PythonRDD[12] at
> RDD at PythonRDD.scala:43), which has no missing parents
>
> Also my program is taking lot of time to execute.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DAG-info-tp20940.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


DAG info

2015-01-01 Thread shahid
hi guys


i have just starting using spark, i am getting this as an info
15/01/02 11:54:17 INFO DAGScheduler: Parents of final stage: List()
15/01/02 11:54:17 INFO DAGScheduler: Missing parents: List()
15/01/02 11:54:17 INFO DAGScheduler: Submitting Stage 6 (PythonRDD[12] at
RDD at PythonRDD.scala:43), which has no missing parents

Also my program is taking lot of time to execute.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-info-tp20940.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DAG info

2015-01-01 Thread shahid ashraf
hi guys


i have just starting using spark, i am getting this as an info
15/01/02 11:54:17 INFO DAGScheduler: Parents of final stage: List()
15/01/02 11:54:17 INFO DAGScheduler: Missing parents: List()
15/01/02 11:54:17 INFO DAGScheduler: Submitting Stage 6 (PythonRDD[12] at
RDD at PythonRDD.scala:43), which has no missing parents

and my program is taking lot of time to execute.
-- 
with Regards
Shahid Ashraf