[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-03-31 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17072212#comment-17072212
 ] 

fqaiser94 commented on SPARK-22231:
---

Makes sense, thanks!

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // +---++--+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 |20.0|[[20,21.0], [21,22.0]]|
> // +---++--+
> {code}

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-03-31 Thread DB Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071969#comment-17071969
 ] 

DB Tsai commented on SPARK-22231:
-

[~fqaiser94] Thanks for continuing this work. We implemented this feature while 
I was at Netflix, and it's ready useful for end users to manipulate nested 
dataframe. Currently, we try to not assign the ticket to prevent someone is 
being assigned but not works on it. Therefore, I unassigned this JIRA.

In your PR, you implement the first part, and I create a sub-task for it. 
https://issues.apache.org/jira/browse/SPARK-31317 Can you change the Jira 
number to  SPARK-31317 to have it properly linked? 

 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-03-31 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071945#comment-17071945
 ] 

fqaiser94 commented on SPARK-22231:
---

Excellent, thanks Reynold. 
If somebody could assign this ticket to me, that would be great as I have the 
code ready for all 3 methods. 
Pull request for the first one ({{withField}}) is ready here: 
[https://github.com/apache/spark/pull/27066]
Looking forward to the reviews. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // ||

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-03-29 Thread Reynold Xin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17070688#comment-17070688
 ] 

Reynold Xin commented on SPARK-22231:
-

[~fqaiser94] thanks for your persistence and my apologies for the delay. You 
have my buy-in. This is a great idea.

 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-26 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17023919#comment-17023919
 ] 

fqaiser94 commented on SPARK-22231:
---

@rxin no problems, take your time.

Apparently Spark 3.0 code freeze is coming up soon so for those who are 
interested in seeing these features sooner rather later, I wanted to share a 
[library|https://github.com/fqaiser94/mse] I've written that adds 
{{withField}}, {{withFieldRenamed}}, and {{dropFields}} methods to the Column 
class implicitly. The signatures of the methods follows pretty much what we've 
discussed so far in this ticket. Hopefully it's helpful to others. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-14 Thread Reynold Xin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015705#comment-17015705
 ] 

Reynold Xin commented on SPARK-22231:
-

Hey sorry. Been pretty busy. I will take a look this week.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // +---++--+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-13 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17014252#comment-17014252
 ] 

fqaiser94 commented on SPARK-22231:
---

[~rxin] do you have any other questions/thoughts on this? 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // +---++--+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-03 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007543#comment-17007543
 ] 

fqaiser94 commented on SPARK-22231:
---

Just to be complete, there is a third option here. 
 We don't necessarily have to add new methods to Column or Dataset. 
 We could just add ({{add_field}}, {{rename_field}}, {{drop_field}}) methods to 
the {{org.apache.spark.sql.functions}} Object. 
 That said, I still feel it's more natural for these methods to belong to 
Column. 

Regarding your question: *Can withField modify a nested field itself?*
 Short answer; no, not in the implementation I was imagining.

Long answer; to be honest, I initially came across this Jira while trying to 
write a method that would do exactly that (i.e. modify a deeply nested field 
directly). 
 However, if you were to write {{Column.withField}} so that it can *only* 
modify StructFields at the top level of a StructType column passed to it, this 
is flexible enough to  support not only all of the use-cases mentioned so far 
in this Jira (in a succinct way) but *also* you could write a new method that 
builds on top of {{Column.withField}} (or the underlying {{BinaryExpression}}) 
to directly modify nested fields. Here is a quick implementation of this (that 
I'm not proud of) which could be included in {{org.apache.spark.sql.functions}} 
Object. :
{code:java}
object functions {
  import org.apache.spark.sql.catalyst.expressions.NamedExpression
  import org.apache.spark.sql.functions._

  /**
   *
   * @param nestedStructType   : e.g. $"a.b" where a and b are both StructType. 
   * @param fieldName  : Field to add/replace in nestedStructType based 
on name.  
   * @param fieldValue : Value to assign to fieldName
   * @return a copy the top-level nestedStructType column with fieldName 
added/replaced
   */
  def add_nested_field(
  nestedStructField: Column,
  fieldName: String,
  fieldValue: Column): Column = {
val parentColumnAndChildFieldNamePairs: Seq[(Column, String)] = {
  // TODO: handle case where nested Column name contains escaped "."
  val parentColumnNames = getAllNestedStructColumnNames(nestedStructField)
  val childFieldNames = parentColumnNames.drop(1).map(_.split("\\.").last) 
:+ fieldName
  parentColumnNames.map(col).zip(childFieldNames)
}

val updatedNestedStructColumn = parentColumnAndChildFieldNamePairs.reverse
  .foldLeft(fieldValue) {
case (childCol, (parentCol, childFieldName)) =>
  parentCol.withField(childFieldName, childCol)
  }

updatedNestedStructColumn
  }

  private def getAllNestedStructColumnNames(structColumn: Column): Seq[String] 
= {
val nameParts = 
structColumn.expr.asInstanceOf[NamedExpression].toAttribute.name.split("\\.").toSeq

var result = Seq.empty[String]
var i = 0
while (i < nameParts.length) {
  result = result :+ nameParts.slice(0, i + 1).mkString(".")
  i += 1
}
result
  }
}

{code}
Which could then be applied as follows:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(Row(Row(1, 2, 3), Row(4, null, 6),
  StructType(Seq(
StructField("a", StructType(Seq(
  StructField("a", StructType(Seq(
StructField("a", IntegerType),
StructField("b", IntegerType),
StructField("c", IntegerType, 
  StructField("b", StructType(Seq(
StructField("a", IntegerType),
StructField("b", IntegerType),
StructField("c", IntegerType
  )
).cache

data.show(false)
+--+
|a |
+--+
|[[1, 2, 3], [4,, 6]]  |
+--+

data.printSchema
|-- a: struct (nullable = true)
||-- a: struct (nullable = true)
|||-- a: integer (nullable = true)
|||-- b: integer (nullable = true)
|||-- c: integer (nullable = true)
||-- b: struct (nullable = true)
|||-- a: integer (nullable = true)
|||-- b: integer (nullable = true)
|||-- c: integer (nullable = true)

// replace deeply nested field
data.withColumn("newA", functions.add_nested_field($"a.b", "b", 
lit(5))).show(false)
+--+--+
|a |newA  |
+--+--+
|[[1, 2, 3], [4,, 6]]  |[[1, 2, 3], [4, 5, 6]]|
+--+--+

// add new deeply nested field
data.withColumn("newA", functions.add_nested_field($"a.b", "d", 
lit("hello"))).show(false)
++---+
|a   |newA   |
++---+
|[[1, 2, 3], [4,, 6]]|[[1, 2, 3], [4,, 6, hello]]|
++---+

{code}
 Another reason why I would want this feature as a separate method is because 
for {{Column.withField}} to support modifying 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread Reynold Xin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007215#comment-17007215
 ] 

Reynold Xin commented on SPARK-22231:
-

[~fqaiser94] you convinced me with #2. It'd be very verbose if we only allow 
DataFrame.withColumnRenamed to modify nested fields and no new methods in 
Column.

#1 isn't really a problem because DataFrame.withColumnRenamed should be able to 
handle both top level field and struct fields as well.

 

 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007191#comment-17007191
 ] 

fqaiser94 commented on SPARK-22231:
---

I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on {{StructType}} columns, 
the existing {{getItem}} method on Column operates only on {{ArrayType}} and 
{{MapType}} columns, etc.

+*Reason #2*+

I'm struggling to see how one would express operations on deeply nested 
StructFields easily if you were to put these methods on Dataframe. Using the 
{{withField}} method I added to Column in my PR, you can add or replace deeply 
nested StructFields like this:
{code:java}
data.show(false)
+-+
|a                                |
+-+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+-+

data.withColumn("a", 'a.withField(
  "b", $"a.b".withField(
"a", $"a.b.a".withField(
  "b", lit(5).show(false)
+---+
|a                                  |
+---+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9]]]|
+---+
{code}
You can see a fully reproducible examples of this in my PR: 
[https://github.com/apache/spark/pull/27066]

Another common use-case that would be hard to express by adding the methods to 
Dataframe would be operations on {{ArrayType(StructType)}} or 
{{MapType(}}{{StructType}}{{, StructType)}} columns. By adding the methods to 
Column, you can express such operations naturally using the recently added 
higher-order-functions feature in Spark:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(List(Row(1, 2, 3), Row(4, 5, 6),
  StructType(Seq(
StructField("array", ArrayType(StructType(Seq(
  StructField("a", IntegerType),
  StructField("b", IntegerType),
  StructField("c", IntegerType)
))
  ).cache

data.show(false)
+--+
|array                 |
+--+
|[[1, 2, 3], [4, 5, 6]]|
+--+

data.withColumn("newArray", transform('array, structElem => 
structElem.withField("d", lit("hello".show(false)
+--++
|array                 |newArray                            |
+--++
|[[1, 2, 3], [4, 5, 6]]|[[1, 2, 3, hello], [4, 5, 6, hello]]|
+--++
{code}
 +*Reason #3*+

To add these methods to Dataframe, we would need signatures that would look 
strange compared to the methods available on Dataframe today. Off the top of my 
head, their signatures could look like this:
 * {{def withField(colName: String, structColumn: Column, fieldName: String, 
field: Column)}}
 Returns a new Dataframe with a new Column (colName) containing a copy of 
structColumn with field added/replaced based on name.
 * {{def withFieldRenamed(}}{{structColumn}}{{: Column, existingFieldName: 
String, newFieldName: String)}}
 Returns a new Dataframe with existingFieldName in structColumn renamed to 
newFieldName.

Maybe these signatures could be refined further? I'm not sure. 

 

For these reasons, it seems more natural and logical to me to have the 
{{withField}} and {{withFieldRenamed}} methods on Column rather than Dataframe. 
Regarding the proposed {{drop}} method, I think its debatable: 
 * {{drop}} method could be added to Column. 
 I can't at present think of a particular use-case which would necessitate 
adding the {{drop}} method to Column.
 * The existing {{drop}} method on Dataframe could be augmented to support 
dropping StructField if a reference to a StructField is passed to it. I'm not 
sure how challenging this would be to implement but presumably this should be 
possible. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread Henry Davidge (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007131#comment-17007131
 ] 

Henry Davidge commented on SPARK-22231:
---

I agree that these methods make more sense on DataFrame with the caveat that 
they need sane behavior for arrays of structs as well to avoid the need for 
column-level APIs. Otherwise you have the same problem as today if you want to 
add / remove a field from an array of structs. I think ideally 
{{df.withColumn("array_of_structs.new_field", expr("array_of_structs.field_a + 
array_of_structs.field_b"))}} would be equivalent to 
{{transform(array_of_structs, el -> with_column("new_field", el.field_a + 
el.field_b))}} in a column-level API.

Btw, this comes up a lot with genomic data, so we added a version of 
{{withField}} to the Glow library that I would love to deprecate: 
https://github.com/projectglow/glow/blob/master/core/src/main/scala/io/projectglow/sql/optimizer/hlsOptimizerRules.scala#L32-L37

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread Reynold Xin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007117#comment-17007117
 ] 

Reynold Xin commented on SPARK-22231:
-

Makes sense. One question (I've asked about this before): should the 3 
functions be on DataFrame, or on Column? 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007116#comment-17007116
 ] 

fqaiser94 commented on SPARK-22231:
---

Hi folks, I can personally affirm that this would be a valuable feature to have 
in Spark. Looking around, its clear to me that other people have a need for 
this feature as well e.g. SPARK-16483,  [Mailing 
list|http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Applying-transformation-on-a-struct-inside-an-array-td18934.html],
 etc. 

A few things have changed since the last comments in this discussion. Support 
for higher-order functions has been 
[added|https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html]
 to the Apache Spark project and in particular, there are now {{transform}} and 
{{filter}} functions available for operating on ArrayType columns. These would 
be the equivalent of the {{mapItems}} and {{filterItems}} functions that were 
previously proposed in this ticket. To complete this ticket then, I think all 
that is needed is adding the {{withField}}, {{withFieldRenamed}}, and {{drop}} 
methods to the Column class. Looking through the discussion, I can summarize 
the signatures for these new methods should be as follows: 
 - {{def withField(fieldName: String, field: Column): Column}}
 Returns a new StructType column with field added/replaced based on name.

 - {{def drop(colNames: String*)}}
 Returns a new StructType column with field dropped.

 - {{def withFieldRenamed(existingName: String, newName: String): Column}}
 Returns a new StructType column with field renamed.

Since it didn't seem like anybody was actively working on this, I went ahead 
and created a pull request to add a {{withField}} method to the {{Column}} 
class that conforms with the specs discussed in this ticket. You can review the 
PR here: [https://github.com/apache/spark/pull/27066]

As this is my first PR to the Apache Spark project, I wanted to keep the PR 
small. However, I wouldn't mind writing the {{drop}} and {{withFieldRenamed}} 
methods as well in separate PRs once the current PR is accepted. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2019-01-04 Thread Timothy Pharo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16734160#comment-16734160
 ] 

Timothy Pharo commented on SPARK-22231:
---

[~dbtsai] and [~jeremyrsmith], this all looks great and is just what we have 
been looking.  As this isn't yet available in Spark upstream and is not likely 
to be available any time soon, is there any plan to expose this separately in 
the interim?

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-11-14 Thread DB Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252611#comment-16252611
 ] 

DB Tsai commented on SPARK-22231:
-

Thanks [~jeremyrsmith] for adding more details. There are couple technical 
challenging as [~jeremyrsmith] pointed out; we can agree on what APIs should 
look like in this JIRA first, and then address those challenging in PRs. 

# [~rxin], with {{dropColumn}} support in {{Column}}, we can have some share 
building block on {{Column}} like 
{code}
def computeNewFeature: Column => Column = (col: Column) => {
  col.withColumn(col("feature") * 2 as "newFeature").dropColumn("feature")
}
{code} I agree that this can be achieved with dataframe with {{drop}} as well, 
but sometimes, it's more convenient to work with {{Column}} API. Plus, as 
[~nkronenfeld] pointed out, it's nice to have {{Column}} and {{Dataset}} to 
share some signatures. 
# In our internal implementation, we use {{Column.withField}} instead of 
{{Column.withColumn}}. Since I prefer to have both {{Column}} and {{Dataset}} 
sharing the same method names, so in the example I wrote above, I use 
{{Column.withColumn}}. But I agree that {{Column.withField}} is less confusing. 
Anymore feedback on this?


> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-16 Thread Jeremy Smith (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16206143#comment-16206143
 ] 

Jeremy Smith commented on SPARK-22231:
--

[~viirya] - It is confusing that the lambda is {{Column => Column}}, but that 
is the necessary type signature. This lambda essentially receives an 
{{Expression}} (wrapped in a {{Column}}, which is the user-accessible mechanism 
for specifying an {{Expression}}), and returns a new {{Expression}} (specified 
by performing {{Column}} operations) which captures the transformation to be 
done on the input {{Expression}}. This lambda is given to {{MapObjects}} or 
{{FilterObjects}} which execute the transformation for each item present in the 
result of the input {{Expression}}.

The underlying expressions {{MapObjects}} and {{FilterObjects}} are agnostic to 
where the input {{Expression}} or the {{Expression => Expression}} lambda come 
from, so it would be possible to create an alternate user-facing {{Element}} 
abstraction which also captures an {{Expression}} similarly to {{Column}}. But 
I don't think it's worthwhile to do that just to resolve some confusing 
terminology in this rather specific case of a {{Column}} representing an 
element of an array column.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199789#comment-16199789
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

[~Jeremy Smith] Thanks for the context. Regarding the versions for the 
`Column`, I have a question:

You said the input `Column` to the lambda represents an element of the array. 
If so, should we call it `Element` instead of `Column`? Because it sounds not a 
column at all.



> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Jeremy Smith (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198965#comment-16198965
 ] 

Jeremy Smith commented on SPARK-22231:
--

I implemented these at Netflix, so I wanted to provide some more context and 
answer some of the questions above.

First, [~nkronenfeld]:
1. I named it `withField` on a column, because it makes it clear that it's 
expected to be a struct column and you're dealing with fields of the struct. My 
take was that `withColumn` on a Column is somewhat confusing.
2. Maybe `withFieldRenamed`, following the above reasoning?
3. `mapItems` works at the Dataset level as well. We have extension methods for 
`Column` and `Dataset` to provide `mapItems` and `filterItems`. On `Column` 
there are two versions:

  * `[map|filter]Items(fn: Column => Column)` assumes the column upon which 
it's invoked is an array column, and returns a new array column which is 
updated using the given `Column => Column` lambda. The input `Column` to the 
lambda represents an element of the array column.
  * `[map|filter]Items(fieldName: String)(fn: Column => Column): Column` 
assumes the column upon which it's invoked is a struct, and that `fieldName` is 
an array field of that struct. It applies the `Column => Column` lambda to each 
element of the array field, and returns a new struct column with the 
`fieldName` field updated and all other fields of the original struct preserved.

On `Dataset`, the enrichment provides only the second form of extension methods:

* `[map|filter]Items(columnName: String)(fn: Column => Column): Dataset[Row]` 
assumes the given `columnName` of the `Dataset` upon which it's invoked is an 
array column, and returns a new `Dataset` with that column mapped using the 
given `Column => Column` lambda. All other columns of the original `Dataset` 
are preserved.

I wanted to also add a bit of context around how these are implemented and why 
we didn't simply make a PR with the code.

`mapItems` and `filterItems` behind the scenes use catalyst expressions for 
their implementation. `mapItems` simply provides an interface to the existing 
`MapObjects` expression, and `filterItems` uses a similar expression 
`FilterObjects` that we implemented. This presents a couple of issues with 
implementation:

1. `MapObjects` does not support `eval`, so if you get into the `eval` codepath 
(i.e. with anything that does `CodegenFallback`) then it will fail. We mitigate 
this by never using the default `CodegenFallback` and instead implemented our 
own `CodegenFallback`-like traits which use `eval` only for the current 
expression but use codegen to evaluate child expressions. We implemented both 
codegen and eval for `FilterObjects`, and we had originally intended to also 
implement eval for `MapObjects` and submit both in a PR. But it proved very 
difficult to test these in the existing expression test framework in the Spark 
codebase (we used our own test harness to test both code paths of 
`FilterObjects`). So that effort never came to fruition. Note that we're 
currently on Spark 2.0 with some backported patches, so maybe this will be 
easier when we migrate to Spark 2.1?
2. In order to work properly, we had to make sure that the columns given to 
`MapObjects` and `FilterObjects` are fully resolved when the expression is 
created; this was a while ago, but I think the reason was that transformation 
rules during analysis aren't easily extensible and the rules around 
`MapObjects` enforce that it must be fully resolved before analysis. You also 
need a DataType to pass to these, which you can only get from a resolved 
column. So you currently must do something like `df("col")` rather than 
`$"col"`, or you'll get an immediate exception. Another issue with this is that 
if you're doing nested transformations with `mapItems`, you cannot have fully 
resolved columns. We mitigated this by doing an out-of-band "ad-hoc analysis" 
step where we manually traverse the expression tree and replace 
`UnresolvedAlias` and `UnresolvedExtractValue` with their resolved versions, 
purely for the purposes of getting the DataType out. This is obviously not 
ideal, but again it's something that might require less workaround in newer 
Spark versions.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Nathan Kronenfeld (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198861#comment-16198861
 ] 

Nathan Kronenfeld commented on SPARK-22231:
---

One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread DB Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198286#comment-16198286
 ] 

DB Tsai commented on SPARK-22231:
-

[~viirya] Thanks. I fixed the typo :) Yes, {{mapItems}} will work on any Array 
type column. Since the dataframe can be anything, these analysis check will 
happen in the runtime. For dataset, we should be able to do the compile time 
check, but I'm not sure how to do it.

Will be nice to have [~hvanhovell] to chime in, and we can share the building 
blocks between the higher order function APIs in SQL and our Scala APIs. 

Thanks.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198301#comment-16198301
 ] 

Reynold Xin commented on SPARK-22231:
-

For drop columns - why not just df.drop("items.b")?


> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // +---++--+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 |20.0|[[20,21.0], [21,22.0]]|
> // +---++--+
> 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198306#comment-16198306
 ] 

Reynold Xin commented on SPARK-22231:
-

Can you say more? I can't think of a case in which you'd want to drop a nested 
column for some rows but not other rows (which implicitly passing in a closure 
would allow you to do).


> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread DB Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198305#comment-16198305
 ] 

DB Tsai commented on SPARK-22231:
-

[~rxin] {{df.drop("items.b")}} works too. We also want to have {{drop}} API in 
{{Column}} for constructing more complicated expressions. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> // +---++--+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198146#comment-16198146
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

Btw, the capacity to work on nested data types looks similar to the higher 
order functions proposed at 
https://databricks.com/blog/2017/05/24/working-with-nested-data-using-higher-order-functions-in-sql-on-databricks.html.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198096#comment-16198096
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

Looks like `mapItems` is an API can work on any Array type column?

When will we detect possible error of operations on the element? Will we do 
analysis check or we only know it during runtime? For example, when we do 
`item.drop("b")` but there is no such column in `item`, will we know it before 
running the job?

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double 

[jira] [Commented] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198082#comment-16198082
 ] 

Liang-Chi Hsieh commented on SPARK-22231:
-

I think there is a typo in the second example to add a new column:
{code:scala}
val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "c")
}
{code}

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct (containsNull = true)
> // |||-- a: integer (nullable = true)
> // |||-- b: double (nullable = true)
> result.show(false)
> // +---++--+
> // |foo|bar |items |
> //