Michael - I mean although preparing and repartitioning the underlying
data can't avoid the shuffle introduced by Spark SQL (Yin has explained
why), but it does help to reduce network IO.
On 1/21/15 10:01 AM, Yin Huai wrote:
Hello Michael,
In Spark SQL, we have our internal concepts of Output Partitioning
(representing the partitioning scheme of an operator's output) and
Required Child Distribution (representing the requirement of input
data distribution of an operator) for a physical operator. Let's say
we have two operators, parent and child, and the parent takes the
output of the child as its input. At the end of query planning
process, whenever the Output Partitioning of the child does not
satisfy the Required Child Distribution of the parent, we will add an
Exchange operator between the parent and child to shuffle the data.
Right now, we do not record the partitioning scheme of an input table.
So, I think even if you use partitionBy (or DISTRIBUTE BY in SQL) to
prepare your data, you still will see the Exchange operator and your
GROUP BY operation will be executed in a new stage (after the Exchange).
Making Spark SQL aware of the partitioning scheme of input tables is a
useful optimization. I just created
https://issues.apache.org/jira/browse/SPARK-5354 to track it.
Thanks,
Yin
On Wed, Jan 21, 2015 at 8:43 AM, Michael Davies
<[email protected] <mailto:[email protected]>>
wrote:
Hi Cheng,
Are you saying that by setting up the lineage
schemaRdd.keyBy(_.getString(1)).partitionBy(newHashPartitioner(n)).values.applySchema(schema)
then Spark SQL will know that an SQL “group by” on Customer Code
will not have to shuffle?
But the prepared will have already shuffled so we pay an upfront
cost for future groupings (assuming we cache I suppose)
Mick
On 20 Jan 2015, at 20:44, Cheng Lian <[email protected]
<mailto:[email protected]>> wrote:
First of all, even if the underlying dataset is partitioned as
expected, a shuffle can’t be avoided. Because Spark SQL knows
nothing about the underlying data distribution. However, this
does reduce network IO.
You can prepare your data like this (say |CustomerCode| is a
string field with ordinal 1):
|val schemaRdd = sql(...)
val schema = schemaRdd.schema
val prepared = schemaRdd.keyBy(_.getString(1)).partitionBy(new
HashPartitioner(n)).values.applySchema(schema)
|
|n| should be equal to |spark.sql.shuffle.partitions|.
Cheng
On 1/19/15 7:44 AM, Mick Davies wrote:
Is it possible to use a HashPartioner or something similar to distribute a
SchemaRDDs data by the hash of a particular column or set of columns.
Having done this I would then hope that GROUP BY could avoid shuffle
E.g. set up a HashPartioner on CustomerCode field so that
SELECT CustomerCode, SUM(Cost)
FROM Orders
GROUP BY CustomerCode
would not need to shuffle.
Cheers
Mick
--
View this message in
context:http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
Sent from the Apache Spark User List mailing list archive atNabble.com
<http://Nabble.com>.
---------------------------------------------------------------------
To unsubscribe, e-mail:[email protected]
<mailto:[email protected]>
For additional commands, e-mail:[email protected]
<mailto:[email protected]>