Re: [GRAPHX] Graph Algorithms and Spark

2016-04-21 Thread Denny Lee
BTW, we recently had a webinar on GraphFrames at
http://go.databricks.com/graphframes-dataframe-based-graphs-for-apache-spark

On Thu, Apr 21, 2016 at 14:30 Dimitris Kouzis - Loukas 
wrote:

> This thread is good. Maybe it should make it to doc or the users group
>
> On Thu, Apr 21, 2016 at 9:25 PM, Zhan Zhang 
> wrote:
>
>>
>> You can take a look at this blog from data bricks about GraphFrames
>>
>> https://databricks.com/blog/2016/03/03/introducing-graphframes.html
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Apr 21, 2016, at 12:53 PM, Robin East  wrote:
>>
>> Hi
>>
>> Aside from LDA, which is implemented in MLLib, GraphX has the following
>> built-in algorithms:
>>
>>
>>- PageRank/Personalised PageRank
>>- Connected Components
>>- Strongly Connected Components
>>- Triangle Count
>>- Shortest Paths
>>- Label Propagation
>>
>>
>> It also implements a version of Pregel framework, a form of
>> bulk-synchronous parallel processing that is the foundation of most of the
>> above algorithms. We cover other algorithms in our book and if you search
>> on google you will find a number of other examples.
>>
>>
>> ---
>> Robin East
>> *Spark GraphX in Action* Michael Malak and Robin East
>> Manning Publications Co.
>> http://www.manning.com/books/spark-graphx-in-action
>>
>>
>>
>>
>>
>> On 21 Apr 2016, at 19:47, tgensol  wrote:
>>
>> Hi there,
>>
>> I am working in a group of the University of Michigan, and we are trying
>> to
>> make (and find first) some Distributed graph algorithms.
>>
>> I know spark, and I found GraphX. I read the docs, but I only found Latent
>> Dirichlet Allocation algorithms working with GraphX, so I was wondering
>> why
>> ?
>>
>> Basically, the groupe wants to implement Minimal Spanning Tree, kNN,
>> shortest path at first.
>>
>> So my askings are :
>> Is graphX enough stable for developing this kind of algorithms on it ?
>> Do you know some algorithms like these working on top of GraphX ? And if
>> no,
>> why do you think, nobody tried to do it ? Is this too hard ? Or just
>> because
>> nobody needs it ?
>>
>> Maybe, it is only my knowledge about GraphX which is weak, and it is not
>> possible to make these algorithms with GraphX.
>>
>> Thanking you in advance,
>> Best regards,
>>
>> Thibaut
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com .
>>
>> -
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>>
>>
>


Re: [GRAPHX] Graph Algorithms and Spark

2016-04-21 Thread Dimitris Kouzis - Loukas
This thread is good. Maybe it should make it to doc or the users group

On Thu, Apr 21, 2016 at 9:25 PM, Zhan Zhang  wrote:

>
> You can take a look at this blog from data bricks about GraphFrames
>
> https://databricks.com/blog/2016/03/03/introducing-graphframes.html
>
> Thanks.
>
> Zhan Zhang
>
> On Apr 21, 2016, at 12:53 PM, Robin East  wrote:
>
> Hi
>
> Aside from LDA, which is implemented in MLLib, GraphX has the following
> built-in algorithms:
>
>
>- PageRank/Personalised PageRank
>- Connected Components
>- Strongly Connected Components
>- Triangle Count
>- Shortest Paths
>- Label Propagation
>
>
> It also implements a version of Pregel framework, a form of
> bulk-synchronous parallel processing that is the foundation of most of the
> above algorithms. We cover other algorithms in our book and if you search
> on google you will find a number of other examples.
>
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 21 Apr 2016, at 19:47, tgensol  wrote:
>
> Hi there,
>
> I am working in a group of the University of Michigan, and we are trying to
> make (and find first) some Distributed graph algorithms.
>
> I know spark, and I found GraphX. I read the docs, but I only found Latent
> Dirichlet Allocation algorithms working with GraphX, so I was wondering why
> ?
>
> Basically, the groupe wants to implement Minimal Spanning Tree, kNN,
> shortest path at first.
>
> So my askings are :
> Is graphX enough stable for developing this kind of algorithms on it ?
> Do you know some algorithms like these working on top of GraphX ? And if
> no,
> why do you think, nobody tried to do it ? Is this too hard ? Or just
> because
> nobody needs it ?
>
> Maybe, it is only my knowledge about GraphX which is weak, and it is not
> possible to make these algorithms with GraphX.
>
> Thanking you in advance,
> Best regards,
>
> Thibaut
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com .
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>
>
>


Re: Improving system design logging in spark

2016-04-21 Thread Ali Tootoonchian
Hi,

My point for #2 is distinguishing between how long does it take for each
task to read a data from disk and transfer it through network to targeted
node. As I know (correct me if I'm wrong) block time to fetch data includes
both reading a data by remote node and transferring it to requested node. If
the block time is bigger than our expectation, from system design, we cannot
identify which component is weakest link, storage or network. 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Improving-system-design-logging-in-spark-tp17291p17308.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: RFC: Remote "HBaseTest" from examples?

2016-04-21 Thread Ted Yu
Zhan:
I have mentioned the JIRA numbers in the thread starting with (note the
typo in subject of this thread):

RFC: Remove ...

On Thu, Apr 21, 2016 at 1:28 PM, Zhan Zhang  wrote:

> FYI: There are several pending patches for DataFrame support on top of
> HBase.
>
> Thanks.
>
> Zhan Zhang
>
> On Apr 20, 2016, at 2:43 AM, Saisai Shao  wrote:
>
> +1, HBaseTest in Spark Example is quite old and obsolete, the HBase
> connector in HBase repo has evolved a lot, it would be better to guide user
> to refer to that not here in Spark example. So good to remove it.
>
> Thanks
> Saisai
>
> On Wed, Apr 20, 2016 at 1:41 AM, Josh Rosen 
> wrote:
>
>> +1; I think that it's preferable for code examples, especially
>> third-party integration examples, to live outside of Spark.
>>
>> On Tue, Apr 19, 2016 at 10:29 AM Reynold Xin  wrote:
>>
>>> Yea in general I feel examples that bring in a large amount of
>>> dependencies should be outside Spark.
>>>
>>>
>>> On Tue, Apr 19, 2016 at 10:15 AM, Marcelo Vanzin 
>>> wrote:
>>>
 Hey all,

 Two reasons why I think we should remove that from the examples:

 - HBase now has Spark integration in its own repo, so that really
 should be the template for how to use HBase from Spark, making that
 example less useful, even misleading.

 - It brings up a lot of extra dependencies that make the size of the
 Spark distribution grow.

 Any reason why we shouldn't drop that example?

 --
 Marcelo

 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org


>>>
>
>


Re: [Spark-SQL] Reduce Shuffle Data by pushing filter toward storage

2016-04-21 Thread atootoonchian
I create an issue in Spark project: SPARK-14820



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Reduce-Shuffle-Data-by-pushing-filter-toward-storage-tp17297p17306.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [GRAPHX] Graph Algorithms and Spark

2016-04-21 Thread Zhan Zhang

You can take a look at this blog from data bricks about GraphFrames

https://databricks.com/blog/2016/03/03/introducing-graphframes.html

Thanks.

Zhan Zhang

On Apr 21, 2016, at 12:53 PM, Robin East 
> wrote:

Hi

Aside from LDA, which is implemented in MLLib, GraphX has the following 
built-in algorithms:


  *   PageRank/Personalised PageRank
  *   Connected Components
  *   Strongly Connected Components
  *   Triangle Count
  *   Shortest Paths
  *   Label Propagation

It also implements a version of Pregel framework, a form of bulk-synchronous 
parallel processing that is the foundation of most of the above algorithms. We 
cover other algorithms in our book and if you search on google you will find a 
number of other examples.

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action





On 21 Apr 2016, at 19:47, tgensol 
> wrote:

Hi there,

I am working in a group of the University of Michigan, and we are trying to
make (and find first) some Distributed graph algorithms.

I know spark, and I found GraphX. I read the docs, but I only found Latent
Dirichlet Allocation algorithms working with GraphX, so I was wondering why
?

Basically, the groupe wants to implement Minimal Spanning Tree, kNN,
shortest path at first.

So my askings are :
Is graphX enough stable for developing this kind of algorithms on it ?
Do you know some algorithms like these working on top of GraphX ? And if no,
why do you think, nobody tried to do it ? Is this too hard ? Or just because
nobody needs it ?

Maybe, it is only my knowledge about GraphX which is weak, and it is not
possible to make these algorithms with GraphX.

Thanking you in advance,
Best regards,

Thibaut



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html
Sent from the Apache Spark Developers List mailing list archive at 
Nabble.com.

-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org
For additional commands, e-mail: 
dev-h...@spark.apache.org





Re: RFC: Remote "HBaseTest" from examples?

2016-04-21 Thread Zhan Zhang
FYI: There are several pending patches for DataFrame support on top of HBase.

Thanks.

Zhan Zhang

On Apr 20, 2016, at 2:43 AM, Saisai Shao 
> wrote:

+1, HBaseTest in Spark Example is quite old and obsolete, the HBase connector 
in HBase repo has evolved a lot, it would be better to guide user to refer to 
that not here in Spark example. So good to remove it.

Thanks
Saisai

On Wed, Apr 20, 2016 at 1:41 AM, Josh Rosen 
> wrote:
+1; I think that it's preferable for code examples, especially third-party 
integration examples, to live outside of Spark.

On Tue, Apr 19, 2016 at 10:29 AM Reynold Xin 
> wrote:
Yea in general I feel examples that bring in a large amount of dependencies 
should be outside Spark.


On Tue, Apr 19, 2016 at 10:15 AM, Marcelo Vanzin 
> wrote:
Hey all,

Two reasons why I think we should remove that from the examples:

- HBase now has Spark integration in its own repo, so that really
should be the template for how to use HBase from Spark, making that
example less useful, even misleading.

- It brings up a lot of extra dependencies that make the size of the
Spark distribution grow.

Any reason why we shouldn't drop that example?

--
Marcelo

-
To unsubscribe, e-mail: 
dev-unsubscr...@spark.apache.org
For additional commands, e-mail: 
dev-h...@spark.apache.org






Re: [GRAPHX] Graph Algorithms and Spark

2016-04-21 Thread Robin East
Hi

Aside from LDA, which is implemented in MLLib, GraphX has the following 
built-in algorithms:

PageRank/Personalised PageRank
Connected Components
Strongly Connected Components
Triangle Count
Shortest Paths
Label Propagation

It also implements a version of Pregel framework, a form of bulk-synchronous 
parallel processing that is the foundation of most of the above algorithms. We 
cover other algorithms in our book and if you search on google you will find a 
number of other examples.

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 21 Apr 2016, at 19:47, tgensol  wrote:
> 
> Hi there,
> 
> I am working in a group of the University of Michigan, and we are trying to
> make (and find first) some Distributed graph algorithms. 
> 
> I know spark, and I found GraphX. I read the docs, but I only found Latent
> Dirichlet Allocation algorithms working with GraphX, so I was wondering why
> ?
> 
> Basically, the groupe wants to implement Minimal Spanning Tree, kNN,
> shortest path at first.
> 
> So my askings are :
> Is graphX enough stable for developing this kind of algorithms on it ?
> Do you know some algorithms like these working on top of GraphX ? And if no,
> why do you think, nobody tried to do it ? Is this too hard ? Or just because
> nobody needs it ?
> 
> Maybe, it is only my knowledge about GraphX which is weak, and it is not
> possible to make these algorithms with GraphX.
> 
> Thanking you in advance,
> Best regards,
> 
> Thibaut 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 



Re: [GRAPHX] Graph Algorithms and Spark

2016-04-21 Thread Krishna Sankar
Hi,

   1. Yep, GraphX is stable and would be a good choice for you to implement
   algorithms. For a quick intro you can refer to our Strata MLlib tutorial
   GraphX slides http://goo.gl/Ffq2Az
   2. GraphX has implemented algorithms like PageRank &
   ConnectedComponents[1]
   3. It also has primitives to develop the kind of algorithms that you are
   talking about
   4. For you to implement interesting algorithms, the main APIs of
   interest would be the pregel API and the aggregateMessages API[2]. Am sure
   you will also use the map*, subgraph and the join APIs.

Cheers

[1]
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.GraphOps
[2]
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph

On Thu, Apr 21, 2016 at 11:47 AM, tgensol 
wrote:

> Hi there,
>
> I am working in a group of the University of Michigan, and we are trying to
> make (and find first) some Distributed graph algorithms.
>
> I know spark, and I found GraphX. I read the docs, but I only found Latent
> Dirichlet Allocation algorithms working with GraphX, so I was wondering why
> ?
>
> Basically, the groupe wants to implement Minimal Spanning Tree, kNN,
> shortest path at first.
>
> So my askings are :
> Is graphX enough stable for developing this kind of algorithms on it ?
> Do you know some algorithms like these working on top of GraphX ? And if
> no,
> why do you think, nobody tried to do it ? Is this too hard ? Or just
> because
> nobody needs it ?
>
> Maybe, it is only my knowledge about GraphX which is weak, and it is not
> possible to make these algorithms with GraphX.
>
> Thanking you in advance,
> Best regards,
>
> Thibaut
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


[GRAPHX] Graph Algorithms and Spark

2016-04-21 Thread tgensol
Hi there,

I am working in a group of the University of Michigan, and we are trying to
make (and find first) some Distributed graph algorithms. 

I know spark, and I found GraphX. I read the docs, but I only found Latent
Dirichlet Allocation algorithms working with GraphX, so I was wondering why
?

Basically, the groupe wants to implement Minimal Spanning Tree, kNN,
shortest path at first.

So my askings are :
Is graphX enough stable for developing this kind of algorithms on it ?
Do you know some algorithms like these working on top of GraphX ? And if no,
why do you think, nobody tried to do it ? Is this too hard ? Or just because
nobody needs it ?

Maybe, it is only my knowledge about GraphX which is weak, and it is not
possible to make these algorithms with GraphX.

Thanking you in advance,
Best regards,

Thibaut 



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/GRAPHX-Graph-Algorithms-and-Spark-tp17301.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [Spark-SQL] Reduce Shuffle Data by pushing filter toward storage

2016-04-21 Thread Ted Yu
Interesting analysis. 

Can you log a JIRA ?

> On Apr 21, 2016, at 11:07 AM, atootoonchian  wrote:
> 
> SQL query planner can have intelligence to push down filter commands towards
> the storage layer. If we optimize the query planner such that the IO to the
> storage is reduced at the cost of running multiple filters (i.e., compute),
> this should be desirable when the system is IO bound. An example to prove
> the case in point is below from TPCH test bench:
> 
> Let’s look at query q19 of TPCH test bench.
> select
>sum(l_extendedprice* (1 - l_discount)) as revenue
> from lineitem, part
> where
>  ( p_partkey = l_partkey
>and p_brand = 'Brand#12'
>and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
>and l_quantity >= 1 and l_quantity <= 1 + 10
>and p_size between 1 and 5
>and l_shipmode in ('AIR', 'AIR REG')
>and l_shipinstruct = 'DELIVER IN PERSON')
>  or
>  ( p_partkey = l_partkey
>and p_brand = 'Brand#23'
>and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
>and l_quantity >= 10 and l_quantity <= 10 + 10
>and p_size between 1 and 10
>and l_shipmode in ('AIR', 'AIR REG')
>and l_shipinstruct = 'DELIVER IN PERSON')
>  or
>  ( p_partkey = l_partkey
>and p_brand = 'Brand#34'
>and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
>and l_quantity >= 20 and l_quantity <= 20 + 10
>and p_size between 1 and 15
>and l_shipmode in ('AIR', 'AIR REG')
>and l_shipinstruct = 'DELIVER IN PERSON')
> 
> Latest version of Spark creates a following planner (not exactly, more
> readable planner) to execute q19.
> Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))
>  Project [l_extendedprice,l_discount]
>Join Inner, Some(((p_partkey = l_partkey) && 
> ((
>   (p_brand = Brand#12) && 
>p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && 
>   (l_quantity >= 1.0)) && (l_quantity <= 11.0)) && 
>   (p_size <= 5)) || 
> (p_brand = Brand#23) && 
> p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) && 
>(l_quantity >= 10.0)) && (l_quantity <= 20.0)) && 
>(p_size <= 10))) || 
> (p_brand = Brand#34) && 
> p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
>(l_quantity >= 20.0)) && (l_quantity <= 30.0)) && 
>(p_size <= 15)
>  Project [l_partkey, l_quantity, l_extendedprice, l_discount]
>Filter ((isnotnull(l_partkey) && 
>(isnotnull(l_shipinstruct) && 
>(l_shipmode IN (AIR,AIR REG) && 
>(l_shipinstruct = DELIVER IN PERSON
>  LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber,
> l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
> l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode,
> l_comment], MapPartitionsRDD[316] 
>  Project [p_partkey, p_brand, p_size, p_container]
>Filter ((isnotnull(p_partkey) && 
>(isnotnull(p_size) && 
>(cast(cast(p_size as decimal(20,0)) as int) >= 1)))
>  LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
> p_container, p_retailprice, p_comment], MapPartitionsRDD[314]
> 
> As you see only three filter commands are pushed before join process is
> executed.
>  l_shipmode IN (AIR,AIR REG)
>  l_shipinstruct = DELIVER IN PERSON
>  (cast(cast(p_size as decimal(20,0)) as int) >= 1)
> 
> And the following filters are applied during the join process
>  p_brand = Brand#12
>  p_container IN (SM CASE,SM BOX,SM PACK,SM PKG) 
>  l_quantity >= 1.0 && l_quantity <= 11.0 
>  p_size <= 5  
>  p_brand = Brand#23 
>  p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) 
>  l_quantity >= 10.0 && l_quantity <= 20.0 
>  p_size <= 10 
>  p_brand = Brand#34 
>  p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) 
>  l_quantity >= 20.0 && l_quantity <= 30.0
>  p_size <= 15
> 
> Let’s look at the following sequence of SQL commands which produce same
> result.
> val partDfFilter = sqlContext.sql("""
>|select p_brand, p_partkey from part 
>|where
>| (p_brand = 'Brand#12'
>|   and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
>|   and p_size between 1 and 5)
>| or
>| (p_brand = 'Brand#23'
>|   and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
>|   and p_size between 1 and 10)
>| or
>| (p_brand = 'Brand#34'
>|   and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
>|   and p_size between 1 and 15)
>   """.stripMargin)
> 
> val itemLineDfFilter = sqlContext.sql("""
>|select 
>| l_partkey, l_quantity, l_extendedprice, l_discount from lineitem
>|where
>| (l_quantity >= 1 and l_quantity <= 30
>|   and l_shipmode in ('AIR', 'AIR REG')
>|   and l_shipinstruct = 'DELIVER IN PERSON')
>  """.stripMargin)
> 
> 

Re: [Spark-SQL] Reduce Shuffle Data by pushing filter toward storage

2016-04-21 Thread atootoonchian
Hi Marcin

I attached a pdf format of issue.

Reduce_Shuffle_Data_by_pushing_filter_toward_storage.pdf

  



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Reduce-Shuffle-Data-by-pushing-filter-toward-storage-tp17297p17299.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: [Spark-SQL] Reduce Shuffle Data by pushing filter toward storage

2016-04-21 Thread Marcin Tustin
I think that's an important result. Could you format your email to split
out your parts a little more? It all runs together for me in gmail, so it's
hard to follow, and I very much would like to.

On Thu, Apr 21, 2016 at 2:07 PM, atootoonchian  wrote:

> SQL query planner can have intelligence to push down filter commands
> towards
> the storage layer. If we optimize the query planner such that the IO to the
> storage is reduced at the cost of running multiple filters (i.e., compute),
> this should be desirable when the system is IO bound. An example to prove
> the case in point is below from TPCH test bench:
>
> Let’s look at query q19 of TPCH test bench.
> select
> sum(l_extendedprice* (1 - l_discount)) as revenue
> from lineitem, part
> where
>   ( p_partkey = l_partkey
> and p_brand = 'Brand#12'
> and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
> and l_quantity >= 1 and l_quantity <= 1 + 10
> and p_size between 1 and 5
> and l_shipmode in ('AIR', 'AIR REG')
> and l_shipinstruct = 'DELIVER IN PERSON')
>   or
>   ( p_partkey = l_partkey
> and p_brand = 'Brand#23'
> and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
> and l_quantity >= 10 and l_quantity <= 10 + 10
> and p_size between 1 and 10
> and l_shipmode in ('AIR', 'AIR REG')
> and l_shipinstruct = 'DELIVER IN PERSON')
>   or
>   ( p_partkey = l_partkey
> and p_brand = 'Brand#34'
> and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
> and l_quantity >= 20 and l_quantity <= 20 + 10
> and p_size between 1 and 15
> and l_shipmode in ('AIR', 'AIR REG')
> and l_shipinstruct = 'DELIVER IN PERSON')
>
> Latest version of Spark creates a following planner (not exactly, more
> readable planner) to execute q19.
> Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))
>   Project [l_extendedprice,l_discount]
> Join Inner, Some(((p_partkey = l_partkey) &&
> ((
>(p_brand = Brand#12) &&
> p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) &&
>(l_quantity >= 1.0)) && (l_quantity <= 11.0)) &&
>(p_size <= 5)) ||
> (p_brand = Brand#23) &&
>  p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) &&
> (l_quantity >= 10.0)) && (l_quantity <= 20.0)) &&
> (p_size <= 10))) ||
> (p_brand = Brand#34) &&
>  p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
> (l_quantity >= 20.0)) && (l_quantity <= 30.0)) &&
> (p_size <= 15)
>   Project [l_partkey, l_quantity, l_extendedprice, l_discount]
> Filter ((isnotnull(l_partkey) &&
> (isnotnull(l_shipinstruct) &&
> (l_shipmode IN (AIR,AIR REG) &&
> (l_shipinstruct = DELIVER IN PERSON
>   LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber,
> l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
> l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode,
> l_comment], MapPartitionsRDD[316]
>   Project [p_partkey, p_brand, p_size, p_container]
> Filter ((isnotnull(p_partkey) &&
> (isnotnull(p_size) &&
> (cast(cast(p_size as decimal(20,0)) as int) >= 1)))
>   LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
> p_container, p_retailprice, p_comment], MapPartitionsRDD[314]
>
> As you see only three filter commands are pushed before join process is
> executed.
>   l_shipmode IN (AIR,AIR REG)
>   l_shipinstruct = DELIVER IN PERSON
>   (cast(cast(p_size as decimal(20,0)) as int) >= 1)
>
> And the following filters are applied during the join process
>   p_brand = Brand#12
>   p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)
>   l_quantity >= 1.0 && l_quantity <= 11.0
>   p_size <= 5
>   p_brand = Brand#23
>   p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)
>   l_quantity >= 10.0 && l_quantity <= 20.0
>   p_size <= 10
>   p_brand = Brand#34
>   p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)
>   l_quantity >= 20.0 && l_quantity <= 30.0
>   p_size <= 15
>
> Let’s look at the following sequence of SQL commands which produce same
> result.
> val partDfFilter = sqlContext.sql("""
> |select p_brand, p_partkey from part
> |where
> | (p_brand = 'Brand#12'
> |   and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
> |   and p_size between 1 and 5)
> | or
> | (p_brand = 'Brand#23'
> |   and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED
> PACK')
> |   and p_size between 1 and 10)
> | or
> | (p_brand = 'Brand#34'
> |   and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
> |   and p_size between 1 and 15)
>""".stripMargin)
>
> val itemLineDfFilter = sqlContext.sql("""
> |select
> | l_partkey, l_quantity, l_extendedprice, l_discount from lineitem
> |where
> | 

[Spark-SQL] Reduce Shuffle Data by pushing filter toward storage

2016-04-21 Thread atootoonchian
SQL query planner can have intelligence to push down filter commands towards
the storage layer. If we optimize the query planner such that the IO to the
storage is reduced at the cost of running multiple filters (i.e., compute),
this should be desirable when the system is IO bound. An example to prove
the case in point is below from TPCH test bench:

Let’s look at query q19 of TPCH test bench.
select
sum(l_extendedprice* (1 - l_discount)) as revenue
from lineitem, part
where
  ( p_partkey = l_partkey
and p_brand = 'Brand#12'
and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
and l_quantity >= 1 and l_quantity <= 1 + 10
and p_size between 1 and 5
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON')
  or
  ( p_partkey = l_partkey
and p_brand = 'Brand#23'
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
and l_quantity >= 10 and l_quantity <= 10 + 10
and p_size between 1 and 10
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON')
  or
  ( p_partkey = l_partkey
and p_brand = 'Brand#34'
and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
and l_quantity >= 20 and l_quantity <= 20 + 10
and p_size between 1 and 15
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON')

Latest version of Spark creates a following planner (not exactly, more
readable planner) to execute q19.
Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))
  Project [l_extendedprice,l_discount]
Join Inner, Some(((p_partkey = l_partkey) && 
((
   (p_brand = Brand#12) && 
p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && 
   (l_quantity >= 1.0)) && (l_quantity <= 11.0)) && 
   (p_size <= 5)) || 
(p_brand = Brand#23) && 
 p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) && 
(l_quantity >= 10.0)) && (l_quantity <= 20.0)) && 
(p_size <= 10))) || 
(p_brand = Brand#34) && 
 p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && 
(l_quantity >= 20.0)) && (l_quantity <= 30.0)) && 
(p_size <= 15)
  Project [l_partkey, l_quantity, l_extendedprice, l_discount]
Filter ((isnotnull(l_partkey) && 
(isnotnull(l_shipinstruct) && 
(l_shipmode IN (AIR,AIR REG) && 
(l_shipinstruct = DELIVER IN PERSON
  LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber,
l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode,
l_comment], MapPartitionsRDD[316] 
  Project [p_partkey, p_brand, p_size, p_container]
Filter ((isnotnull(p_partkey) && 
(isnotnull(p_size) && 
(cast(cast(p_size as decimal(20,0)) as int) >= 1)))
  LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
p_container, p_retailprice, p_comment], MapPartitionsRDD[314]

As you see only three filter commands are pushed before join process is
executed.
  l_shipmode IN (AIR,AIR REG)
  l_shipinstruct = DELIVER IN PERSON
  (cast(cast(p_size as decimal(20,0)) as int) >= 1)

And the following filters are applied during the join process
  p_brand = Brand#12
  p_container IN (SM CASE,SM BOX,SM PACK,SM PKG) 
  l_quantity >= 1.0 && l_quantity <= 11.0 
  p_size <= 5  
  p_brand = Brand#23 
  p_container IN (MED BAG,MED BOX,MED PKG,MED PACK) 
  l_quantity >= 10.0 && l_quantity <= 20.0 
  p_size <= 10 
  p_brand = Brand#34 
  p_container IN (LG CASE,LG BOX,LG PACK,LG PKG) 
  l_quantity >= 20.0 && l_quantity <= 30.0
  p_size <= 15

Let’s look at the following sequence of SQL commands which produce same
result.
val partDfFilter = sqlContext.sql("""
|select p_brand, p_partkey from part 
|where
| (p_brand = 'Brand#12'
|   and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
|   and p_size between 1 and 5)
| or
| (p_brand = 'Brand#23'
|   and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
|   and p_size between 1 and 10)
| or
| (p_brand = 'Brand#34'
|   and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
|   and p_size between 1 and 15)
   """.stripMargin)

val itemLineDfFilter = sqlContext.sql("""
|select 
| l_partkey, l_quantity, l_extendedprice, l_discount from lineitem
|where
| (l_quantity >= 1 and l_quantity <= 30
|   and l_shipmode in ('AIR', 'AIR REG')
|   and l_shipinstruct = 'DELIVER IN PERSON')
  """.stripMargin)

partDfFilter.registerTempTable("partFilter")
itemLineDfFilter.registerTempTable("lineitemFilter")

var q19Query = """ 
|select
| sum(l_extendedprice* (1 - l_discount)) as revenue
|from
| lineitemFilter,
| partFilter
|where
| 

[Spark-SQL] Reduce Shuffle Data by pushing filter toward storage

2016-04-21 Thread atootoonchian
SQL query planner can have intelligence to push down filter commands towards
the storage layer. If we optimize the query planner such that the IO to the
storage is reduced at the cost of running multiple filters (i.e., compute),
this should be desirable when the system is IO bound. An example to prove
the case in point is below from TPCH test bench:Let’s look at query q19 of
TPCH test bench.selectsum(l_extendedprice* (1 - l_discount)) as
revenuefrom lineitem, partwhere  ( p_partkey = l_partkeyand
p_brand = 'Brand#12'and p_container in ('SM CASE', 'SM BOX', 'SM
PACK', 'SM PKG')and l_quantity >= 1 and l_quantity <= 1 + 10   
and p_size between 1 and 5and l_shipmode in ('AIR', 'AIR REG')   
and l_shipinstruct = 'DELIVER IN PERSON')  or  ( p_partkey =
l_partkeyand p_brand = 'Brand#23'and p_container in ('MED
BAG', 'MED BOX', 'MED PKG', 'MED PACK')and l_quantity >= 10 and
l_quantity <= 10 + 10and p_size between 1 and 10and
l_shipmode in ('AIR', 'AIR REG')and l_shipinstruct = 'DELIVER IN
PERSON')  or  ( p_partkey = l_partkeyand p_brand =
'Brand#34'and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG
PKG')and l_quantity >= 20 and l_quantity <= 20 + 10and
p_size between 1 and 15and l_shipmode in ('AIR', 'AIR REG')   
and l_shipinstruct = 'DELIVER IN PERSON')Latest version of Spark creates a
following planner (not exactly, more readable planner) to execute
q19.Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))  Project
[l_extendedprice,l_discount]Join Inner, Some(((p_partkey = l_partkey) &&
((   (p_brand = Brand#12) && p_container IN (SM CASE,SM BOX,SM
PACK,SM PKG)) &&(l_quantity >= 1.0)) && (l_quantity <= 11.0)) &&   
(p_size <= 5)) || (p_brand = Brand#23) &&  p_container IN (MED
BAG,MED BOX,MED PKG,MED PACK)) && (l_quantity >= 10.0)) && (l_quantity
<= 20.0)) && (p_size <= 10))) || (p_brand = Brand#34) && 
p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) && (l_quantity >= 20.0))
&& (l_quantity <= 30.0)) && (p_size <= 15)  Project [l_partkey,
l_quantity, l_extendedprice, l_discount]Filter
((isnotnull(l_partkey) && (isnotnull(l_shipinstruct) && 
   
(l_shipmode IN (AIR,AIR REG) && (l_shipinstruct = DELIVER IN
PERSON  LogicalRDD [l_orderkey, l_partkey, l_suppkey,
l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct,
l_shipmode, l_comment], MapPartitionsRDD[316]   Project [p_partkey,
p_brand, p_size, p_container]Filter ((isnotnull(p_partkey) &&
(isnotnull(p_size) && (cast(cast(p_size as decimal(20,0)) as int) >=
1)))  LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type,
p_size, p_container, p_retailprice, p_comment], MapPartitionsRDD[314]As
you see only three filter commands are pushed before join process is
executed.  l_shipmode IN (AIR,AIR REG)  l_shipinstruct = DELIVER IN PERSON 
(cast(cast(p_size as decimal(20,0)) as int) >= 1)
And the following filters are applied during the join process  p_brand =
Brand#12  p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)   l_quantity >= 1.0
&& l_quantity <= 11.0   p_size <= 5p_brand = Brand#23   p_container IN
(MED BAG,MED BOX,MED PKG,MED PACK)   l_quantity >= 10.0 && l_quantity <=
20.0   p_size <= 10   p_brand = Brand#34   p_container IN (LG CASE,LG BOX,LG
PACK,LG PKG)   l_quantity >= 20.0 && l_quantity <= 30.0  p_size <= 15Let’s
look at the following sequence of SQL commands which produce same result.val
partDfFilter = sqlContext.sql("""|select p_brand, p_partkey from
part |where| (p_brand = 'Brand#12'|   and
p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')|   and
p_size between 1 and 5)| or| (p_brand = 'Brand#23'|  
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')|  
and p_size between 1 and 10)| or| (p_brand = 'Brand#34'   
|   and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')|  
and p_size between 1 and 15)   """.stripMargin)val itemLineDfFilter =
sqlContext.sql("""|select | l_partkey, l_quantity,
l_extendedprice, l_discount from lineitem|where| (l_quantity
>= 1 and l_quantity <= 30|   and l_shipmode in ('AIR', 'AIR REG')   
|   and l_shipinstruct = 'DELIVER IN PERSON') 
""".stripMargin)partDfFilter.registerTempTable("partFilter")itemLineDfFilter.registerTempTable("lineitemFilter")var
q19Query = """ |select| sum(l_extendedprice* (1 -
l_discount)) as revenue|from| lineitemFilter,|
partFilter|where| (p_partkey = l_partkey|   and
p_brand = 'Brand#12'|   and l_quantity >= 1 and l_quantity <= 1 +
10) 

回复:Re: 回复:Spark sql and hive into different result with same sql

2016-04-21 Thread FangFang Chen
maybe I found the root cause from spark doc:
"Unlimited precision decimal columns are no longer supported, instead Spark SQL 
enforces a maximum precision of 38. When inferring schema from BigDecimal 
objects, a precision of (38, 18) is now used. When no precision is specified in 
DDL then the default remainsDecimal(10, 0)."
I got decimal(38,18) when describe this table, while got decimal when show 
create this table. Seems spark is getting the scheme information based on 
create table side. Correct?
Is there any workaround to resolve this problem? Beside alter hive table column 
type from decimal to decimal with precision.


Thanks






发自 网易邮箱大师
在2016年04月20日 20:47,Ted Yu 写道:
Do you mind trying out build from master branch ?


1.5.3 is a bit old.



On Wed, Apr 20, 2016 at 5:25 AM, FangFang Chen  
wrote:

I found spark sql lost precision, and handle data as int with some rule. 
Following is data got via hive shell and spark sql, with same sql to same hive 
table:
Hive:
0.4
0.5
1.8
0.4
0.49
1.5
Spark sql:
1
2
2
Seems the handle rule is: when decimal point data <0.5 then to 0, when decimal 
point data>=0.5 then to 1.


Is this a bug or some configuration thing? Please give some suggestions. Thanks


发自 网易邮箱大师
在2016年04月20日 18:45,FangFang Chen 写道:
The output is:
Spark SQ:6828127
Hive:6980574.1269


发自 网易邮箱大师
在2016年04月20日 18:06,FangFang Chen 写道:
Hi all,
Please give some suggestions. Thanks


With following same sql, spark sql and hive give different result. The sql is 
sum(decimal(38,18)) columns.
Select sum(column) from table;
column is defined as decimal(38,18).


Spark version:1.5.3
Hive version:2.0.0


发自 网易邮箱大师