Fwd: Re: spark-sql force parallel union

2018-11-20 Thread onmstester onmstester
Thanks Kathleen, 1. So if i've got 4 df's and i want "dfv1 union dfv2 union 
dfv3 union dfv4", would it first compute "dfv1 union dfv2" and "dfv3 union 
dfv4" independently and simultaneously? then union their results? 2. Its going 
to be hundreds of partitions to union, creating a temp view for each of them 
might be slow? Sent using Zoho Mail  Forwarded message  
From : kathleen li  To :  
Cc :  Date : Wed, 21 Nov 2018 10:16:21 +0330 Subject : 
Re: spark-sql force parallel union  Forwarded message  
you might first write the code to construct query statement with "union all"  
like below: scala> val query="select * from dfv1 union all select * from dfv2 
union all select * from dfv3" query: String = select * from dfv1 union all 
select * from dfv2 union all select * from dfv3 then write loop to register 
each partition to a view like below:  for (i <- 1 to 3){   
df.createOrReplaceTempView("dfv"+i)   } scala> spark.sql(query).explain == 
Physical Plan == Union :- LocalTableScan [_1#0, _2#1, _3#2] :- LocalTableScan 
[_1#0, _2#1, _3#2] +- LocalTableScan [_1#0, _2#1, _3#2] You can use " roll up" 
or "group set" for multiple dimension  to replace "union" or "union all" On 
Tue, Nov 20, 2018 at 8:34 PM onmstester onmstester 
 wrote: I'm using Spark-Sql to query Cassandra 
tables. In Cassandra, i've partitioned my data with time bucket and one id, so 
based on queries i need to union multiple partitions with spark-sql and do the 
aggregations/group-by on union-result, something like this: for(all cassandra 
partitions){
DataSet currentPartition = sqlContext.sql();
unionResult = unionResult.union(currentPartition);
}
 Increasing input (number of loaded partitions), increases response time more 
than linearly because unions would be done sequentialy. Because there is no 
harm in doing unions in parallel, and i dont know how to force spark to do them 
in parallel, Right now i'm using a ThreadPool to Asyncronosly load all 
partitions in my application (which may cause OOM), and somehow do the sort or 
simple group by in java (Which make me think why even i'm using spark at all?) 
The short question is: How to force spark-sql to load cassandra partitions in 
parallel while doing union on them? Also I don't want too many tasks in spark, 
with my Home-Made Async solution, i use coalesece(1) so one task is so fast 
(only wait time on casandra). Sent using Zoho Mail

Re: spark-sql force parallel union

2018-11-20 Thread kathleen li
you might first write the code to construct query statement with "union
all"  like below:

scala> val query="select * from dfv1 union all select * from dfv2 union all
select * from dfv3"
query: String = select * from dfv1 union all select * from dfv2 union all
select * from dfv3

then write loop to register each partition to a view like below:
 for (i <- 1 to 3){
  df.createOrReplaceTempView("dfv"+i)
  }

scala> spark.sql(query).explain
== Physical Plan ==
Union
:- LocalTableScan [_1#0, _2#1, _3#2]
:- LocalTableScan [_1#0, _2#1, _3#2]
+- LocalTableScan [_1#0, _2#1, _3#2]


You can use " roll up" or "group set" for multiple dimension  to replace
"union" or "union all"

On Tue, Nov 20, 2018 at 8:34 PM onmstester onmstester
 wrote:

> I'm using Spark-Sql to query Cassandra tables. In Cassandra, i've
> partitioned my data with time bucket and one id, so based on queries i need
> to union multiple partitions with spark-sql and do the
> aggregations/group-by on union-result, something like this:
>
> for(all cassandra partitions){
> DataSet currentPartition = sqlContext.sql();
> unionResult = unionResult.union(currentPartition);
> }
>
> Increasing input (number of loaded partitions), increases response time
> more than linearly because unions would be done sequentialy.
>
> Because there is no harm in doing unions in parallel, and i dont know how
> to force spark to do them in parallel, Right now i'm using a ThreadPool to
> Asyncronosly load all partitions in my application (which may cause OOM),
> and somehow do the sort or simple group by in java (Which make me think why
> even i'm using spark at all?)
>
> The short question is: How to force spark-sql to load cassandra partitions
> in parallel while doing union on them? Also I don't want too many tasks in
> spark, with my Home-Made Async solution, i use coalesece(1) so one task is
> so fast (only wait time on casandra).
>
> Sent using Zoho Mail 
>
>
>


spark-sql force parallel union

2018-11-20 Thread onmstester onmstester
I'm using Spark-Sql to query Cassandra tables. In Cassandra, i've partitioned 
my data with time bucket and one id, so based on queries i need to union 
multiple partitions with spark-sql and do the aggregations/group-by on 
union-result, something like this: for(all cassandra partitions){
DataSet currentPartition = sqlContext.sql();
unionResult = unionResult.union(currentPartition);
}
 Increasing input (number of loaded partitions), increases response time more 
than linearly because unions would be done sequentialy. Because there is no 
harm in doing unions in parallel, and i dont know how to force spark to do them 
in parallel, Right now i'm using a ThreadPool to Asyncronosly load all 
partitions in my application (which may cause OOM), and somehow do the sort or 
simple group by in java (Which make me think why even i'm using spark at all?) 
The short question is: How to force spark-sql to load cassandra partitions in 
parallel while doing union on them? Also I don't want too many tasks in spark, 
with my Home-Made Async solution, i use coalesece(1) so one task is so fast 
(only wait time on casandra). Sent using Zoho Mail

Monthly Apache Spark Newsletter

2018-11-20 Thread Ankur Gupta
Hey Guys,

Just launched a monthly Apache Spark Newsletter.

https://newsletterspot.com/apache-spark/

Cheers,
Ankur

Sent from Mail for Windows 10



Is there any window operation for RDDs in Pyspark? like for DStreams

2018-11-20 Thread zakhavan
Hello,

I have two RDDs and my goal is to calculate the Pearson's correlation
between them using sliding window. I want to have 200 samples in each window
from rdd1 and rdd2 and calculate the correlation between them and then slide
the window with 120 samples and calculate the correlation between next 200
samples of windows.I know sliding window works for DStream but I have to use
RDD instead of DStream. When I use window function for RDD i get an error
saying RDD doesn't have window attribute. The reason that I need to use
window operation here is that 1) rdd1 and rdd2 are infinite streams and I
need to partition it to the smaller chunks like windows 2) This built-in
Pearson's correlation function in Pyspark only works for the partitions with
equal size so in my case I chose 200 samples per window and 120 samples for
sliding interval.
I'd appreciate it if you have any idea how to solve it.

My code is here:
if __name__ == "__main__":
sc = SparkContext(appName="CorrelationsExample")
input_path1 = sys.argv[1]
input_path2 = sys.argv[2]
num_of_partitions = 1
rdd1 = sc.textFile(input_path1, num_of_partitions).flatMap(lambda line1:
line1.strip().split("\n")).map(lambda strelem1: float(strelem1))
rdd2 = sc.textFile(input_path2, num_of_partitions).flatMap(lambda line2:
line2.strip().split("\n")).map(lambda strelem2: float(strelem2))
1 = rdd1.collect()
l2 = rdd2.collect()
seriesX = sc.parallelize(l1)
seriesY = sc.parallelize(l2)
print("Correlation is: " + str(Statistics.corr(seriesX, seriesY,
method="pearson")))
sc.stop()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark DataSets and multiple write(.) calls

2018-11-20 Thread Gourav Sengupta
Hi,

 this is interesting, can you please share the code for this and if
possible the source schema and it will be great if you could kindly share a
sample file.


Regards,
Gourav Sengupta

On Tue, Nov 20, 2018 at 9:50 AM Michael Shtelma  wrote:

>
> You can also cache the data frame on disk, if it does not fit into memory.
> An alternative would be to write out data frame as parquet and then read
> it, you can check if in this case the whole pipeline works faster as with
> the standard cache.
>
> Best,
> Michael
>
>
> On Tue, Nov 20, 2018 at 9:14 AM Dipl.-Inf. Rico Bergmann <
> i...@ricobergmann.de> wrote:
>
>> Hi!
>>
>> Thanks Vadim for your answer. But this would be like caching the dataset,
>> right? Or is checkpointing faster then persisting to memory or disk?
>>
>> I attach a pdf of my dataflow program. If I could compute the output of
>> outputs 1-5 in parallel the output of flatmap1 and groupBy could be reused,
>> avoiding to write to disk (at least until the grouping).
>>
>> Any other ideas or proposals?
>>
>> Best,
>>
>> Rico.
>>
>> Am 19.11.2018 um 19:12 schrieb Vadim Semenov:
>>
>> You can use checkpointing, in this case Spark will write out an rdd to
>> whatever destination you specify, and then the RDD can be reused from the
>> checkpointed state avoiding recomputing.
>>
>> On Mon, Nov 19, 2018 at 7:51 AM Dipl.-Inf. Rico Bergmann <
>> i...@ricobergmann.de> wrote:
>>
>>> Thanks for your advise. But I'm using Batch processing. Does anyone have
>>> a solution for the batch processing case?
>>>
>>> Best,
>>>
>>> Rico.
>>>
>>> Am 19.11.2018 um 09:43 schrieb Magnus Nilsson:
>>>
>>>
>>> Magnus Nilsson
>>> 9:43 AM (0 minutes ago)
>>>
>>> to info
>>> I had the same requirements. As far as I know the only way is to extend
>>> the foreachwriter, cache the microbatch result and write to each output.
>>>
>>>
>>> https://docs.databricks.com/spark/latest/structured-streaming/foreach.html
>>>
>>> Unfortunately it seems as if you have to make a new connection "per
>>> batch" instead of creating one long lasting connections for the pipeline as
>>> such. Ie you might have to implement some sort of connection pooling by
>>> yourself depending on sink.
>>>
>>> Regards,
>>>
>>> Magnus
>>>
>>>
>>> On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann <
>>> i...@ricobergmann.de> wrote:
>>>
 Hi!

 I have a SparkSQL programm, having one input and 6 ouputs (write). When
 executing this programm every call to write(.) executes the plan. My
 problem is, that I want all these writes to happen in parallel (inside
 one execution plan), because all writes have a common and compute
 intensive subpart, that can be shared by all plans. Is there a
 possibility to do this? (Caching is not a solution because the input
 dataset is way to large...)

 Hoping for advises ...

 Best, Rico B.


 ---
 Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
 https://www.avast.com/antivirus


 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>> 
>>>  Virenfrei.
>>> www.avast.com
>>> 
>>> <#m_-5039401061051454276_m_-1099009014531121604_m_-7118895712672043959_m_6471921890789606388_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>> --
>> Sent from my iPhone
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark DataSets and multiple write(.) calls

2018-11-20 Thread Michael Shtelma
You can also cache the data frame on disk, if it does not fit into memory.
An alternative would be to write out data frame as parquet and then read
it, you can check if in this case the whole pipeline works faster as with
the standard cache.

Best,
Michael


On Tue, Nov 20, 2018 at 9:14 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi!
>
> Thanks Vadim for your answer. But this would be like caching the dataset,
> right? Or is checkpointing faster then persisting to memory or disk?
>
> I attach a pdf of my dataflow program. If I could compute the output of
> outputs 1-5 in parallel the output of flatmap1 and groupBy could be reused,
> avoiding to write to disk (at least until the grouping).
>
> Any other ideas or proposals?
>
> Best,
>
> Rico.
>
> Am 19.11.2018 um 19:12 schrieb Vadim Semenov:
>
> You can use checkpointing, in this case Spark will write out an rdd to
> whatever destination you specify, and then the RDD can be reused from the
> checkpointed state avoiding recomputing.
>
> On Mon, Nov 19, 2018 at 7:51 AM Dipl.-Inf. Rico Bergmann <
> i...@ricobergmann.de> wrote:
>
>> Thanks for your advise. But I'm using Batch processing. Does anyone have
>> a solution for the batch processing case?
>>
>> Best,
>>
>> Rico.
>>
>> Am 19.11.2018 um 09:43 schrieb Magnus Nilsson:
>>
>>
>> Magnus Nilsson
>> 9:43 AM (0 minutes ago)
>>
>> to info
>> I had the same requirements. As far as I know the only way is to extend
>> the foreachwriter, cache the microbatch result and write to each output.
>>
>> https://docs.databricks.com/spark/latest/structured-streaming/foreach.html
>>
>> Unfortunately it seems as if you have to make a new connection "per
>> batch" instead of creating one long lasting connections for the pipeline as
>> such. Ie you might have to implement some sort of connection pooling by
>> yourself depending on sink.
>>
>> Regards,
>>
>> Magnus
>>
>>
>> On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann <
>> i...@ricobergmann.de> wrote:
>>
>>> Hi!
>>>
>>> I have a SparkSQL programm, having one input and 6 ouputs (write). When
>>> executing this programm every call to write(.) executes the plan. My
>>> problem is, that I want all these writes to happen in parallel (inside
>>> one execution plan), because all writes have a common and compute
>>> intensive subpart, that can be shared by all plans. Is there a
>>> possibility to do this? (Caching is not a solution because the input
>>> dataset is way to large...)
>>>
>>> Hoping for advises ...
>>>
>>> Best, Rico B.
>>>
>>>
>>> ---
>>> Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
>>> https://www.avast.com/antivirus
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> 
>>  Virenfrei.
>> www.avast.com
>> 
>> <#m_-1099009014531121604_m_-7118895712672043959_m_6471921890789606388_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> --
> Sent from my iPhone
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Spark DataSets and multiple write(.) calls

2018-11-20 Thread Dipl.-Inf. Rico Bergmann
Hi!

Thanks Vadim for your answer. But this would be like caching the
dataset, right? Or is checkpointing faster then persisting to memory or
disk?

I attach a pdf of my dataflow program. If I could compute the output of
outputs 1-5 in parallel the output of flatmap1 and groupBy could be
reused, avoiding to write to disk (at least until the grouping).

Any other ideas or proposals?

Best,

Rico.


Am 19.11.2018 um 19:12 schrieb Vadim Semenov:
> You can use checkpointing, in this case Spark will write out an rdd to
> whatever destination you specify, and then the RDD can be reused from
> the checkpointed state avoiding recomputing.
>
> On Mon, Nov 19, 2018 at 7:51 AM Dipl.-Inf. Rico Bergmann
> mailto:i...@ricobergmann.de>> wrote:
>
> Thanks for your advise. But I'm using Batch processing. Does
> anyone have a solution for the batch processing case?
>
> Best,
>
> Rico.
>
>
> Am 19.11.2018 um 09:43 schrieb Magnus Nilsson:
>>
>>
>>   Magnus Nilsson
>>
>>  
>> 9:43 AM (0 minutes ago)
>>  
>>  
>> to info
>>
>> I had the same requirements. As far as I know the only way is to
>> extend the foreachwriter, cache the microbatch result and write
>> to each output.
>>
>> 
>> https://docs.databricks.com/spark/latest/structured-streaming/foreach.html
>>
>> Unfortunately it seems as if you have to make a new connection
>> "per batch" instead of creating one long lasting connections for
>> the pipeline as such. Ie you might have to implement some sort of
>> connection pooling by yourself depending on sink. 
>>
>> Regards,
>>
>> Magnus
>>
>>
>> On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann
>> mailto:i...@ricobergmann.de>> wrote:
>>
>> Hi!
>>
>> I have a SparkSQL programm, having one input and 6 ouputs
>> (write). When
>> executing this programm every call to write(.) executes the
>> plan. My
>> problem is, that I want all these writes to happen in
>> parallel (inside
>> one execution plan), because all writes have a common and compute
>> intensive subpart, that can be shared by all plans. Is there a
>> possibility to do this? (Caching is not a solution because
>> the input
>> dataset is way to large...)
>>
>> Hoping for advises ...
>>
>> Best, Rico B.
>>
>>
>> ---
>> Diese E-Mail wurde von Avast Antivirus-Software auf Viren
>> geprüft.
>> https://www.avast.com/antivirus
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
>>
>
>
> 
> 
>   Virenfrei. www.avast.com
> 
> 
>
>
> 
> <#m_-7118895712672043959_m_6471921890789606388_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
>
>
>
> -- 
> Sent from my iPhone



---
Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
https://www.avast.com/antivirus


ExecutionDAG.pdf
Description: Adobe PDF document

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org