On Wed, Feb 8, 2017 at 1:14 PM, ayan guha <guha.a...@gmail.com> wrote:

> Will a sql solution will be acceptable?
>

I'm very curious to see how it'd be done in raw SQL if you're up for it!

I think the 2 programmatic solutions so far are viable, though, too.

(By the way, thanks everyone for the great suggestions!)





>
> On Thu, 9 Feb 2017 at 4:01 am, Xiaomeng Wan <shawn...@gmail.com> wrote:
>
>> You could also try pivot.
>>
>> On 7 February 2017 at 16:13, Everett Anderson <ever...@nuna.com.invalid>
>> wrote:
>>
>>
>>
>> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>> I think the fastest way is likely to use a combination of conditionals
>> (when / otherwise), first (ignoring nulls), while grouping by the id.
>> This should get the answer with only a single shuffle.
>>
>> Here is an example
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
>> .
>>
>>
>> Very cool! Using the simpler aggregates feels cleaner.
>>
>>
>>
>> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi Everett,
>>
>> That's pretty much what I'd do. Can't think of a way to beat your
>> solution. Why do you "feel vaguely uneasy about it"?
>>
>>
>> Maybe it felt like I was unnecessarily grouping-by twice, but probably
>> mostly that I hadn't used pivot before.
>>
>> Interestingly, the physical plans are not especially different between
>> these two solutions after the rank column is added. They both have two
>> SortAggregates that seem to be figuring out where to put results based on
>> the rank:
>>
>> My original one:
>>
>> == Physical Plan ==
>> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
>> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
>> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
>> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
>> +- SortAggregate(key=[id#279,name#280], functions=[first(if
>> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
>> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else
>> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312
>> else null, true)])
>>    +- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
>> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
>> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
>> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
>> temp_struct#312 else null, true)])
>>       +- *Project [id#279, name#280, rank#292, struct(extra#281,
>> data#282, priority#283) AS temp_struct#312]
>>          +- Window [denserank(priority#283) windowspecdefinition(id#279,
>> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
>> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
>>             +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false,
>> 0
>>                +- Exchange hashpartitioning(id#279, name#280, 200)
>>                   +- Scan ExistingRDD[id#279,name#280,
>> extra#281,data#282,priority#283]
>>
>>
>> And modifying Michael's slightly to use a rank:
>>
>> import org.apache.spark.sql.functions._
>>
>> def getColumnWithRank(column: String, rank: Int) = {
>>   first(when(col("rank") === lit(rank), col(column)).otherwise(null),
>> ignoreNulls = true)
>> }
>>
>> val withRankColumn = data.withColumn("rank", 
>> functions.dense_rank().over(Window.partitionBy("id",
>> "name").orderBy("priority")))
>>
>> val modCollapsed = withRankColumn
>>   .groupBy($"id", $"name")
>>   .agg(
>>     getColumnWithRank("data", 1) as 'data1,
>>     getColumnWithRank("data", 2) as 'data2,
>>     getColumnWithRank("data", 3) as 'data3,
>>     getColumnWithRank("extra", 1) as 'extra1,
>>     getColumnWithRank("extra", 2) as 'extra2,
>>     getColumnWithRank("extra", 3) as 'extra3)
>>
>>
>> modCollapsed.explain
>>
>> == Physical Plan ==
>> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN
>> (rank#965 = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965
>> = 2) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 3) THEN
>> data#282 ELSE null END, true),first(CASE WHEN (rank#965 = 1) THEN extra#281
>> ELSE null END, true),first(CASE WHEN (rank#965 = 2) THEN extra#281 ELSE
>> null END, true),first(CASE WHEN (rank#965 = 3) THEN extra#281 ELSE null
>> END, true)])
>> +- SortAggregate(key=[id#279,name#280], functions=[partial_first(CASE
>> WHEN (rank#965 = 1) THEN data#282 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 2) THEN data#282 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 3) THEN data#282 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 1) THEN extra#281 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 2) THEN extra#281 ELSE null END, true),partial_first(CASE
>> WHEN (rank#965 = 3) THEN extra#281 ELSE null END, true)])
>>    +- *Project [id#279, name#280, extra#281, data#282, rank#965]
>>       +- Window [denserank(priority#283) windowspecdefinition(id#279,
>> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
>> ROW) AS rank#965], [id#279, name#280], [priority#283 ASC]
>>          +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false, 0
>>             +- Exchange hashpartitioning(id#279, name#280, 200)
>>                +- Scan ExistingRDD[id#279,name#280,
>> extra#281,data#282,priority#283]
>>
>>
>>
>>
>> I'd also check out the execution plan (with explain) to see how it's
>> gonna work at runtime. I may have seen groupBy + join be better than
>> window (there were more exchanges in play for windows I reckon).
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com>
>> wrote:
>> >
>> >
>> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> >> help here too.
>> >
>> >
>> > This seems to work, but I do feel vaguely uneasy about it. :)
>> >
>> > // First add a 'rank' column which is priority order just in case
>> priorities
>> > aren't
>> > // from 1 with no gaps.
>> > val temp1 = data.withColumn("rank", functions.dense_rank()
>> >    .over(Window.partitionBy("id", "name").orderBy("priority")))
>> >
>> > +---+----+-----+------+--------+----+
>> > | id|name|extra|  data|priority|rank|
>> > +---+----+-----+------+--------+----+
>> > |  1|Fred|    8|value1|       1|   1|
>> > |  1|Fred|    8|value8|       2|   2|
>> > |  1|Fred|    8|value5|       3|   3|
>> > |  2| Amy|    9|value3|       1|   1|
>> > |  2| Amy|    9|value5|       2|   2|
>> > +---+----+-----+------+--------+----+
>> >
>> > // Now move all the columns we want to denormalize into a struct column
>> to
>> > keep them together.
>> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
>> > temp1("data"), temp1("priority")))
>> >   .drop("extra", "data", "priority")
>> >
>> > +---+----+----+------------+
>> > | id|name|rank| temp_struct|
>> > +---+----+----+------------+
>> > |  1|Fred|   1|[8,value1,1]|
>> > |  1|Fred|   2|[8,value8,2]|
>> > |  1|Fred|   3|[8,value5,3]|
>> > |  2| Amy|   1|[9,value3,1]|
>> > |  2| Amy|   2|[9,value5,2]|
>> > +---+----+----+------------+
>> >
>> > // groupBy, again, but now pivot the rank column. We need an aggregate
>> > function after pivot,
>> > // so use first -- there will only ever be one element.
>> > val temp3 = temp2.groupBy("id", "name")
>> >   .pivot("rank", Seq("1", "2", "3"))
>> >   .agg(functions.first("temp_struct"))
>> >
>> > +---+----+------------+------------+------------+
>> > | id|name|           1|           2|           3|
>> > +---+----+------------+------------+------------+
>> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
>> > |  2| Amy|[9,value3,1]|[9,value5,2]|        null|
>> > +---+----+------------+------------+------------+
>> >
>> > // Now just moving things out of the structs and clean up.
>> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>> >      .withColumn("data1", temp3("1").getField("data"))
>> >      .withColumn("priority1", temp3("1").getField("priority"))
>> >      .withColumn("extra2", temp3("2").getField("extra"))
>> >      .withColumn("data2", temp3("2").getField("data"))
>> >      .withColumn("priority2", temp3("2").getField("priority"))
>> >      .withColumn("extra3", temp3("3").getField("extra"))
>> >      .withColumn("data3", temp3("3").getField("data"))
>> >      .withColumn("priority3", temp3("3").getField("priority"))
>> >      .drop("1", "2", "3")
>> >
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
>> > data3|priority3|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> > |  1|Fred|     8|value1|        1|     8|value8|        2|     8|value5|
>> > 3|
>> > |  2| Amy|     9|value3|        1|     9|value5|        2|  null|  null|
>> > null|
>> > +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >>
>> >>
>> >> Jacek
>> >>
>> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid>
>> >> wrote:
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm trying to un-explode or denormalize a table like
>> >>>
>> >>> +---+----+-----+------+--------+
>> >>> |id |name|extra|data  |priority|
>> >>> +---+----+-----+------+--------+
>> >>> |1  |Fred|8    |value1|1       |
>> >>> |1  |Fred|8    |value8|2       |
>> >>> |1  |Fred|8    |value5|3       |
>> >>> |2  |Amy |9    |value3|1       |
>> >>> |2  |Amy |9    |value5|2       |
>> >>> +---+----+-----+------+--------+
>> >>>
>> >>> into something that looks like
>> >>>
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> >>> |priority3|
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>> |1  |Fred|8     |value1|1        |8     |value8|2        |8
>>  |value5|3
>> >>> |
>> >>> |2  |Amy |9     |value3|1        |9     |value5|2        |null  |null
>> >>> |null     |
>> >>>
>> >>> +---+----+------+------+---------+------+------+---------+--
>> ----+------+---------+
>> >>>
>> >>> If I were going the other direction, I'd create a new column with an
>> >>> array of structs, each with 'extra', 'data', and 'priority' fields
>> and then
>> >>> explode it.
>> >>>
>> >>> Going from the more normalized view, though, I'm having a harder time.
>> >>>
>> >>> I want to group or partition by (id, name) and order by priority, but
>> >>> after that I can't figure out how to get multiple rows rotated into
>> one.
>> >>>
>> >>> Any ideas?
>> >>>
>> >>> Here's the code to create the input table above:
>> >>>
>> >>> import org.apache.spark.sql.Row
>> >>> import org.apache.spark.sql.Dataset
>> >>> import org.apache.spark.sql.types._
>> >>>
>> >>> val rowsRDD = sc.parallelize(Seq(
>> >>>     Row(1, "Fred", 8, "value1", 1),
>> >>>     Row(1, "Fred", 8, "value8", 2),
>> >>>     Row(1, "Fred", 8, "value5", 3),
>> >>>     Row(2, "Amy", 9, "value3", 1),
>> >>>     Row(2, "Amy", 9, "value5", 2)))
>> >>>
>> >>> val schema = StructType(Seq(
>> >>>     StructField("id", IntegerType, nullable = true),
>> >>>     StructField("name", StringType, nullable = true),
>> >>>     StructField("extra", IntegerType, nullable = true),
>> >>>     StructField("data", StringType, nullable = true),
>> >>>     StructField("priority", IntegerType, nullable = true)))
>> >>>
>> >>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>> >>>
>> >>>
>> >>>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>>
>> --
> Best Regards,
> Ayan Guha
>

Reply via email to