Re: Re: spark-sql force parallel union

2018-11-21 Thread Alessandro Solimando
Hello,
maybe I am overlooking the problem but what I would go for something
similar:

def unionDFs(dfs: List[DataFrame]): DataFrame = {
   dfs.drop(0).foldRight(dfs.apply(0))((df1: DataFrame, df2: DataFrame) =>
df1 union df2)
}

(Would be better to keep dfs as-is and you use an empty DF with the correct
schema).

This should create the sought DAG plan, I cannot give it a run at the
moment to confirm.

At this point you need no views and you will benefit from the parallelism.

What do you think?

Best regards,
Alessandro

On Wed, 21 Nov 2018 at 08:19, onmstester onmstester
 wrote:

> Thanks Kathleen,
>
> 1. So if i've got 4 df's and i want "dfv1 union dfv2 union dfv3 union
> dfv4", would it first compute "dfv1 union dfv2" and "dfv3 union dfv4"
> independently and simultaneously? then union their results?
> 2. Its going to be hundreds of partitions to union, creating a temp view
> for each of them might be slow?
>
> Sent using Zoho Mail <https://www.zoho.com/mail/>
>
>
>  Forwarded message 
> From : kathleen li 
> To : 
> Cc : 
> Date : Wed, 21 Nov 2018 10:16:21 +0330
> Subject : Re: spark-sql force parallel union
>  Forwarded message 
>
> you might first write the code to construct query statement with "union
> all"  like below:
>
> scala> val query="select * from dfv1 union all select * from dfv2 union
> all select * from dfv3"
> query: String = select * from dfv1 union all select * from dfv2 union all
> select * from dfv3
>
> then write loop to register each partition to a view like below:
>  for (i <- 1 to 3){
>   df.createOrReplaceTempView("dfv"+i)
>   }
>
> scala> spark.sql(query).explain
> == Physical Plan ==
> Union
> :- LocalTableScan [_1#0, _2#1, _3#2]
> :- LocalTableScan [_1#0, _2#1, _3#2]
> +- LocalTableScan [_1#0, _2#1, _3#2]
>
>
> You can use " roll up" or "group set" for multiple dimension  to replace
> "union" or "union all"
>
> On Tue, Nov 20, 2018 at 8:34 PM onmstester onmstester <
> onmstes...@zoho.com.invalid> wrote:
>
>
> I'm using Spark-Sql to query Cassandra tables. In Cassandra, i've
> partitioned my data with time bucket and one id, so based on queries i need
> to union multiple partitions with spark-sql and do the
> aggregations/group-by on union-result, something like this:
>
> for(all cassandra partitions){
> DataSet currentPartition = sqlContext.sql();
> unionResult = unionResult.union(currentPartition);
> }
>
> Increasing input (number of loaded partitions), increases response time
> more than linearly because unions would be done sequentialy.
>
> Because there is no harm in doing unions in parallel, and i dont know how
> to force spark to do them in parallel, Right now i'm using a ThreadPool to
> Asyncronosly load all partitions in my application (which may cause OOM),
> and somehow do the sort or simple group by in java (Which make me think why
> even i'm using spark at all?)
>
> The short question is: How to force spark-sql to load cassandra partitions
> in parallel while doing union on them? Also I don't want too many tasks in
> spark, with my Home-Made Async solution, i use coalesece(1) so one task is
> so fast (only wait time on casandra).
>
> Sent using Zoho Mail <https://www.zoho.com/mail/>
>
>
>
>


Fwd: Re: spark-sql force parallel union

2018-11-20 Thread onmstester onmstester
Thanks Kathleen, 1. So if i've got 4 df's and i want "dfv1 union dfv2 union 
dfv3 union dfv4", would it first compute "dfv1 union dfv2" and "dfv3 union 
dfv4" independently and simultaneously? then union their results? 2. Its going 
to be hundreds of partitions to union, creating a temp view for each of them 
might be slow? Sent using Zoho Mail  Forwarded message  
From : kathleen li  To :  
Cc :  Date : Wed, 21 Nov 2018 10:16:21 +0330 Subject : 
Re: spark-sql force parallel union  Forwarded message  
you might first write the code to construct query statement with "union all"  
like below: scala> val query="select * from dfv1 union all select * from dfv2 
union all select * from dfv3" query: String = select * from dfv1 union all 
select * from dfv2 union all select * from dfv3 then write loop to register 
each partition to a view like below:  for (i <- 1 to 3){   
df.createOrReplaceTempView("dfv"+i)   } scala> spark.sql(query).explain == 
Physical Plan == Union :- LocalTableScan [_1#0, _2#1, _3#2] :- LocalTableScan 
[_1#0, _2#1, _3#2] +- LocalTableScan [_1#0, _2#1, _3#2] You can use " roll up" 
or "group set" for multiple dimension  to replace "union" or "union all" On 
Tue, Nov 20, 2018 at 8:34 PM onmstester onmstester 
 wrote: I'm using Spark-Sql to query Cassandra 
tables. In Cassandra, i've partitioned my data with time bucket and one id, so 
based on queries i need to union multiple partitions with spark-sql and do the 
aggregations/group-by on union-result, something like this: for(all cassandra 
partitions){
DataSet currentPartition = sqlContext.sql();
unionResult = unionResult.union(currentPartition);
}
 Increasing input (number of loaded partitions), increases response time more 
than linearly because unions would be done sequentialy. Because there is no 
harm in doing unions in parallel, and i dont know how to force spark to do them 
in parallel, Right now i'm using a ThreadPool to Asyncronosly load all 
partitions in my application (which may cause OOM), and somehow do the sort or 
simple group by in java (Which make me think why even i'm using spark at all?) 
The short question is: How to force spark-sql to load cassandra partitions in 
parallel while doing union on them? Also I don't want too many tasks in spark, 
with my Home-Made Async solution, i use coalesece(1) so one task is so fast 
(only wait time on casandra). Sent using Zoho Mail

Re: spark-sql force parallel union

2018-11-20 Thread kathleen li
you might first write the code to construct query statement with "union
all"  like below:

scala> val query="select * from dfv1 union all select * from dfv2 union all
select * from dfv3"
query: String = select * from dfv1 union all select * from dfv2 union all
select * from dfv3

then write loop to register each partition to a view like below:
 for (i <- 1 to 3){
  df.createOrReplaceTempView("dfv"+i)
  }

scala> spark.sql(query).explain
== Physical Plan ==
Union
:- LocalTableScan [_1#0, _2#1, _3#2]
:- LocalTableScan [_1#0, _2#1, _3#2]
+- LocalTableScan [_1#0, _2#1, _3#2]


You can use " roll up" or "group set" for multiple dimension  to replace
"union" or "union all"

On Tue, Nov 20, 2018 at 8:34 PM onmstester onmstester
 wrote:

> I'm using Spark-Sql to query Cassandra tables. In Cassandra, i've
> partitioned my data with time bucket and one id, so based on queries i need
> to union multiple partitions with spark-sql and do the
> aggregations/group-by on union-result, something like this:
>
> for(all cassandra partitions){
> DataSet currentPartition = sqlContext.sql();
> unionResult = unionResult.union(currentPartition);
> }
>
> Increasing input (number of loaded partitions), increases response time
> more than linearly because unions would be done sequentialy.
>
> Because there is no harm in doing unions in parallel, and i dont know how
> to force spark to do them in parallel, Right now i'm using a ThreadPool to
> Asyncronosly load all partitions in my application (which may cause OOM),
> and somehow do the sort or simple group by in java (Which make me think why
> even i'm using spark at all?)
>
> The short question is: How to force spark-sql to load cassandra partitions
> in parallel while doing union on them? Also I don't want too many tasks in
> spark, with my Home-Made Async solution, i use coalesece(1) so one task is
> so fast (only wait time on casandra).
>
> Sent using Zoho Mail 
>
>
>


spark-sql force parallel union

2018-11-20 Thread onmstester onmstester
I'm using Spark-Sql to query Cassandra tables. In Cassandra, i've partitioned 
my data with time bucket and one id, so based on queries i need to union 
multiple partitions with spark-sql and do the aggregations/group-by on 
union-result, something like this: for(all cassandra partitions){
DataSet currentPartition = sqlContext.sql();
unionResult = unionResult.union(currentPartition);
}
 Increasing input (number of loaded partitions), increases response time more 
than linearly because unions would be done sequentialy. Because there is no 
harm in doing unions in parallel, and i dont know how to force spark to do them 
in parallel, Right now i'm using a ThreadPool to Asyncronosly load all 
partitions in my application (which may cause OOM), and somehow do the sort or 
simple group by in java (Which make me think why even i'm using spark at all?) 
The short question is: How to force spark-sql to load cassandra partitions in 
parallel while doing union on them? Also I don't want too many tasks in spark, 
with my Home-Made Async solution, i use coalesece(1) so one task is so fast 
(only wait time on casandra). Sent using Zoho Mail