[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-26 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17023919#comment-17023919
 ] 

fqaiser94 edited comment on SPARK-22231 at 1/26/20 8:35 PM:


[~rxin] no problems, take your time.

Apparently Spark 3.0 code freeze is coming up soon so for those who are 
interested in seeing these features sooner rather later, I wanted to share a 
[library|https://github.com/fqaiser94/mse] I've written that adds 
{{withField}}, {{withFieldRenamed}}, and {{dropFields}} methods to the Column 
class implicitly. The signatures of the methods follows pretty much what we've 
discussed so far in this ticket. Hopefully it's helpful to others. 


was (Author: fqaiser94):
@rxin no problems, take your time.

Apparently Spark 3.0 code freeze is coming up soon so for those who are 
interested in seeing these features sooner rather later, I wanted to share a 
[library|https://github.com/fqaiser94/mse] I've written that adds 
{{withField}}, {{withFieldRenamed}}, and {{dropFields}} methods to the Column 
class implicitly. The signatures of the methods follows pretty much what we've 
discussed so far in this ticket. Hopefully it's helpful to others. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread Reynold Xin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007215#comment-17007215
 ] 

Reynold Xin edited comment on SPARK-22231 at 1/3/20 4:18 AM:
-

[~fqaiser94] you convinced me with #2. It'd be very verbose if we only allow 
DataFrame.withColumnRenamed to modify nested fields and no new methods in 
Column.

#1 isn't really a problem because DataFrame.withColumnRenamed should be able to 
handle both top level field and struct fields as well.

 

Another question: can withField modify a nested field itself?

 


was (Author: rxin):
[~fqaiser94] you convinced me with #2. It'd be very verbose if we only allow 
DataFrame.withColumnRenamed to modify nested fields and no new methods in 
Column.

#1 isn't really a problem because DataFrame.withColumnRenamed should be able to 
handle both top level field and struct fields as well.

 

 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007191#comment-17007191
 ] 

fqaiser94 edited comment on SPARK-22231 at 1/3/20 3:28 AM:
---

I feel that the {{withField}} and {{withFieldRenamed}} methods make more sense 
to be on Column rather than Dataframe for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on {{StructType}} columns, 
the existing {{getItem}} method on Column operates only on {{ArrayType}} and 
{{MapType}} columns, etc.

+*Reason #2*+

I'm struggling to see how one would express operations on deeply nested 
StructFields easily if you were to put these methods on Dataframe. Using the 
{{withField}} method on Column, you can add or replace deeply nested 
StructFields like this:
{code:java}
data.show(false)
+-+
|a                                |
+-+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+-+

data.withColumn("a", 'a.withField(
  "b", $"a.b".withField(
"a", $"a.b.a".withField(
  "b", lit(5).show(false)
+---+
|a                                  |
+---+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9]]]|
+---+
{code}
You can see a fully reproducible examples of this in my PR: 
[https://github.com/apache/spark/pull/27066]

Another common use-case that would be hard to express by adding the methods to 
Dataframe would be operations on {{ArrayType(StructType)}} or 
{{MapType(}}{{StructType}}{{, StructType)}} columns. By adding the methods to 
Column, you can express such operations naturally using the recently added 
higher-order-functions feature in Spark:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(List(Row(1, 2, 3), Row(4, 5, 6),
  StructType(Seq(
StructField("array", ArrayType(StructType(Seq(
  StructField("a", IntegerType),
  StructField("b", IntegerType),
  StructField("c", IntegerType)
))
  ).cache

data.show(false)
+--+
|array                 |
+--+
|[[1, 2, 3], [4, 5, 6]]|
+--+

data.withColumn("newArray", transform('array, structElem => 
structElem.withField("d", lit("hello".show(false)
+--++
|array                 |newArray                            |
+--++
|[[1, 2, 3], [4, 5, 6]]|[[1, 2, 3, hello], [4, 5, 6, hello]]|
+--++
{code}
 +*Reason #3*+

To add these methods to Dataframe, we would need signatures that would look 
strange compared to the methods available on Dataframe today. Off the top of my 
head, their signatures could look like this:
 * {{def withField(colName: String, structColumn: Column, fieldName: String, 
field: Column)}}{{: Dataframe}}
 Returns a new Dataframe with a new Column (colName) containing a copy of 
structColumn with field added/replaced based on name.
 * {{def withFieldRenamed(}}{{structColumn}}{{: Column, existingFieldName: 
String, newFieldName: String): Dataframe}}
 Returns a new Dataframe with existingFieldName in structColumn renamed to 
newFieldName.

Maybe these signatures could be refined further? I'm not sure. 

 

For these reasons, it seems more natural and logical to me to have the 
{{withField}} and {{withFieldRenamed}} methods on Column rather than Dataframe. 
Regarding the proposed {{drop}} method, I think its debatable: 
 * {{drop}} method could be added to Column. 
 I can't at present think of a particular use-case which would necessitate 
adding the {{drop}} method to Column.
 * The existing {{drop}} method on Dataframe could be augmented to support 
dropping StructField if a reference to a StructField is passed to it. I'm not 
sure how challenging this would be to implement but presumably this should be 
possible. 


was (Author: fqaiser94):
I feel that the {{withField}} and {{withFieldRenamed}} methods make more sense 
to be on Column rather than Dataframe for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007191#comment-17007191
 ] 

fqaiser94 edited comment on SPARK-22231 at 1/3/20 3:14 AM:
---

I feel that the {{withField}} and {{withFieldRenamed}} methods make more sense 
to be on Column rather than Dataframe for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on {{StructType}} columns, 
the existing {{getItem}} method on Column operates only on {{ArrayType}} and 
{{MapType}} columns, etc.

+*Reason #2*+

I'm struggling to see how one would express operations on deeply nested 
StructFields easily if you were to put these methods on Dataframe. Using the 
{{withField}} method I added to Column in my PR, you can add or replace deeply 
nested StructFields like this:
{code:java}
data.show(false)
+-+
|a                                |
+-+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+-+

data.withColumn("a", 'a.withField(
  "b", $"a.b".withField(
"a", $"a.b.a".withField(
  "b", lit(5).show(false)
+---+
|a                                  |
+---+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9]]]|
+---+
{code}
You can see a fully reproducible examples of this in my PR: 
[https://github.com/apache/spark/pull/27066]

Another common use-case that would be hard to express by adding the methods to 
Dataframe would be operations on {{ArrayType(StructType)}} or 
{{MapType(}}{{StructType}}{{, StructType)}} columns. By adding the methods to 
Column, you can express such operations naturally using the recently added 
higher-order-functions feature in Spark:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(List(Row(1, 2, 3), Row(4, 5, 6),
  StructType(Seq(
StructField("array", ArrayType(StructType(Seq(
  StructField("a", IntegerType),
  StructField("b", IntegerType),
  StructField("c", IntegerType)
))
  ).cache

data.show(false)
+--+
|array                 |
+--+
|[[1, 2, 3], [4, 5, 6]]|
+--+

data.withColumn("newArray", transform('array, structElem => 
structElem.withField("d", lit("hello".show(false)
+--++
|array                 |newArray                            |
+--++
|[[1, 2, 3], [4, 5, 6]]|[[1, 2, 3, hello], [4, 5, 6, hello]]|
+--++
{code}
 +*Reason #3*+

To add these methods to Dataframe, we would need signatures that would look 
strange compared to the methods available on Dataframe today. Off the top of my 
head, their signatures could look like this:
 * {{def withField(colName: String, structColumn: Column, fieldName: String, 
field: Column)}}{{: Dataframe}}
 Returns a new Dataframe with a new Column (colName) containing a copy of 
structColumn with field added/replaced based on name.
 * {{def withFieldRenamed(}}{{structColumn}}{{: Column, existingFieldName: 
String, newFieldName: String): Dataframe}}
 Returns a new Dataframe with existingFieldName in structColumn renamed to 
newFieldName.

Maybe these signatures could be refined further? I'm not sure. 

 

For these reasons, it seems more natural and logical to me to have the 
{{withField}} and {{withFieldRenamed}} methods on Column rather than Dataframe. 
Regarding the proposed {{drop}} method, I think its debatable: 
 * {{drop}} method could be added to Column. 
 I can't at present think of a particular use-case which would necessitate 
adding the {{drop}} method to Column.
 * The existing {{drop}} method on Dataframe could be augmented to support 
dropping StructField if a reference to a StructField is passed to it. I'm not 
sure how challenging this would be to implement but presumably this should be 
possible. 


was (Author: fqaiser94):
I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007191#comment-17007191
 ] 

fqaiser94 edited comment on SPARK-22231 at 1/3/20 3:07 AM:
---

I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on {{StructType}} columns, 
the existing {{getItem}} method on Column operates only on {{ArrayType}} and 
{{MapType}} columns, etc.

+*Reason #2*+

I'm struggling to see how one would express operations on deeply nested 
StructFields easily if you were to put these methods on Dataframe. Using the 
{{withField}} method I added to Column in my PR, you can add or replace deeply 
nested StructFields like this:
{code:java}
data.show(false)
+-+
|a                                |
+-+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+-+

data.withColumn("a", 'a.withField(
  "b", $"a.b".withField(
"a", $"a.b.a".withField(
  "b", lit(5).show(false)
+---+
|a                                  |
+---+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9]]]|
+---+
{code}
You can see a fully reproducible examples of this in my PR: 
[https://github.com/apache/spark/pull/27066]

Another common use-case that would be hard to express by adding the methods to 
Dataframe would be operations on {{ArrayType(StructType)}} or 
{{MapType(}}{{StructType}}{{, StructType)}} columns. By adding the methods to 
Column, you can express such operations naturally using the recently added 
higher-order-functions feature in Spark:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(List(Row(1, 2, 3), Row(4, 5, 6),
  StructType(Seq(
StructField("array", ArrayType(StructType(Seq(
  StructField("a", IntegerType),
  StructField("b", IntegerType),
  StructField("c", IntegerType)
))
  ).cache

data.show(false)
+--+
|array                 |
+--+
|[[1, 2, 3], [4, 5, 6]]|
+--+

data.withColumn("newArray", transform('array, structElem => 
structElem.withField("d", lit("hello".show(false)
+--++
|array                 |newArray                            |
+--++
|[[1, 2, 3], [4, 5, 6]]|[[1, 2, 3, hello], [4, 5, 6, hello]]|
+--++
{code}
 +*Reason #3*+

To add these methods to Dataframe, we would need signatures that would look 
strange compared to the methods available on Dataframe today. Off the top of my 
head, their signatures could look like this:
 * {{def withField(colName: String, structColumn: Column, fieldName: String, 
field: Column)}}{{: Dataframe}}{{}}
 Returns a new Dataframe with a new Column (colName) containing a copy of 
structColumn with field added/replaced based on name.
 * {{def withFieldRenamed(}}{{structColumn}}{{: Column, existingFieldName: 
String, newFieldName: String): Dataframe}}
 Returns a new Dataframe with existingFieldName in structColumn renamed to 
newFieldName.

Maybe these signatures could be refined further? I'm not sure. 

 

For these reasons, it seems more natural and logical to me to have the 
{{withField}} and {{withFieldRenamed}} methods on Column rather than Dataframe. 
Regarding the proposed {{drop}} method, I think its debatable: 
 * {{drop}} method could be added to Column. 
 I can't at present think of a particular use-case which would necessitate 
adding the {{drop}} method to Column.
 * The existing {{drop}} method on Dataframe could be augmented to support 
dropping StructField if a reference to a StructField is passed to it. I'm not 
sure how challenging this would be to implement but presumably this should be 
possible. 


was (Author: fqaiser94):
I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007191#comment-17007191
 ] 

fqaiser94 edited comment on SPARK-22231 at 1/3/20 3:07 AM:
---

I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on {{StructType}} columns, 
the existing {{getItem}} method on Column operates only on {{ArrayType}} and 
{{MapType}} columns, etc.

+*Reason #2*+

I'm struggling to see how one would express operations on deeply nested 
StructFields easily if you were to put these methods on Dataframe. Using the 
{{withField}} method I added to Column in my PR, you can add or replace deeply 
nested StructFields like this:
{code:java}
data.show(false)
+-+
|a                                |
+-+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+-+

data.withColumn("a", 'a.withField(
  "b", $"a.b".withField(
"a", $"a.b.a".withField(
  "b", lit(5).show(false)
+---+
|a                                  |
+---+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9]]]|
+---+
{code}
You can see a fully reproducible examples of this in my PR: 
[https://github.com/apache/spark/pull/27066]

Another common use-case that would be hard to express by adding the methods to 
Dataframe would be operations on {{ArrayType(StructType)}} or 
{{MapType(}}{{StructType}}{{, StructType)}} columns. By adding the methods to 
Column, you can express such operations naturally using the recently added 
higher-order-functions feature in Spark:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(List(Row(1, 2, 3), Row(4, 5, 6),
  StructType(Seq(
StructField("array", ArrayType(StructType(Seq(
  StructField("a", IntegerType),
  StructField("b", IntegerType),
  StructField("c", IntegerType)
))
  ).cache

data.show(false)
+--+
|array                 |
+--+
|[[1, 2, 3], [4, 5, 6]]|
+--+

data.withColumn("newArray", transform('array, structElem => 
structElem.withField("d", lit("hello".show(false)
+--++
|array                 |newArray                            |
+--++
|[[1, 2, 3], [4, 5, 6]]|[[1, 2, 3, hello], [4, 5, 6, hello]]|
+--++
{code}
 +*Reason #3*+

To add these methods to Dataframe, we would need signatures that would look 
strange compared to the methods available on Dataframe today. Off the top of my 
head, their signatures could look like this:
 * {{def withField(colName: String, structColumn: Column, fieldName: String, 
field: Column)}}{{: Dataframe}}
 Returns a new Dataframe with a new Column (colName) containing a copy of 
structColumn with field added/replaced based on name.
 * {{def withFieldRenamed(}}{{structColumn}}{{: Column, existingFieldName: 
String, newFieldName: String): Dataframe}}
 Returns a new Dataframe with existingFieldName in structColumn renamed to 
newFieldName.

Maybe these signatures could be refined further? I'm not sure. 

 

For these reasons, it seems more natural and logical to me to have the 
{{withField}} and {{withFieldRenamed}} methods on Column rather than Dataframe. 
Regarding the proposed {{drop}} method, I think its debatable: 
 * {{drop}} method could be added to Column. 
 I can't at present think of a particular use-case which would necessitate 
adding the {{drop}} method to Column.
 * The existing {{drop}} method on Dataframe could be augmented to support 
dropping StructField if a reference to a StructField is passed to it. I'm not 
sure how challenging this would be to implement but presumably this should be 
possible. 


was (Author: fqaiser94):
I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2020-01-02 Thread fqaiser94 (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17007116#comment-17007116
 ] 

fqaiser94 edited comment on SPARK-22231 at 1/2/20 11:25 PM:


Hi folks, I can personally affirm that this would be a valuable feature to have 
in Spark. Looking around, its clear to me that other people have a need for 
this feature as well e.g. SPARK-16483,  [Mailing 
list|http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Applying-transformation-on-a-struct-inside-an-array-td18934.html],
 etc. 

A few things have changed since the last comments in this discussion. Support 
for higher-order functions has been 
[added|https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html]
 to the Apache Spark project and in particular, there are now {{transform}} and 
{{filter}} functions available for operating on ArrayType columns. These would 
be the equivalent of the {{mapItems}} and {{filterItems}} functions that were 
previously proposed in this ticket. To complete this ticket then, I think all 
that is needed is adding the {{withField}}, {{withFieldRenamed}}, and {{drop}} 
methods to the Column class. Looking through the discussion, I can summarize 
the signatures for these new methods should be as follows: 
 - {{def withField(fieldName: String, field: Column): Column}}
 Returns a new StructType column with field added/replaced based on name.

 - {{def drop(fieldNames: String*)}}
 Returns a new StructType column with field dropped.

 - {{def withFieldRenamed(existingName: String, newName: String): Column}}
 Returns a new StructType column with field renamed.

Since it didn't seem like anybody was actively working on this, I went ahead 
and created a pull request to add a {{withField}} method to the {{Column}} 
class that conforms with the specs discussed in this ticket. You can review the 
PR here: [https://github.com/apache/spark/pull/27066]

As this is my first PR to the Apache Spark project, I wanted to keep the PR 
small. However, I wouldn't mind writing the {{drop}} and {{withFieldRenamed}} 
methods as well in separate PRs once the current PR is accepted. 


was (Author: fqaiser94):
Hi folks, I can personally affirm that this would be a valuable feature to have 
in Spark. Looking around, its clear to me that other people have a need for 
this feature as well e.g. SPARK-16483,  [Mailing 
list|http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Applying-transformation-on-a-struct-inside-an-array-td18934.html],
 etc. 

A few things have changed since the last comments in this discussion. Support 
for higher-order functions has been 
[added|https://databricks.com/blog/2018/11/16/introducing-new-built-in-functions-and-higher-order-functions-for-complex-data-types-in-apache-spark.html]
 to the Apache Spark project and in particular, there are now {{transform}} and 
{{filter}} functions available for operating on ArrayType columns. These would 
be the equivalent of the {{mapItems}} and {{filterItems}} functions that were 
previously proposed in this ticket. To complete this ticket then, I think all 
that is needed is adding the {{withField}}, {{withFieldRenamed}}, and {{drop}} 
methods to the Column class. Looking through the discussion, I can summarize 
the signatures for these new methods should be as follows: 
 - {{def withField(fieldName: String, field: Column): Column}}
 Returns a new StructType column with field added/replaced based on name.

 - {{def drop(colNames: String*)}}
 Returns a new StructType column with field dropped.

 - {{def withFieldRenamed(existingName: String, newName: String): Column}}
 Returns a new StructType column with field renamed.

Since it didn't seem like anybody was actively working on this, I went ahead 
and created a pull request to add a {{withField}} method to the {{Column}} 
class that conforms with the specs discussed in this ticket. You can review the 
PR here: [https://github.com/apache/spark/pull/27066]

As this is my first PR to the Apache Spark project, I wanted to keep the PR 
small. However, I wouldn't mind writing the {{drop}} and {{withFieldRenamed}} 
methods as well in separate PRs once the current PR is accepted. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2019-01-04 Thread Timothy Pharo (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16734160#comment-16734160
 ] 

Timothy Pharo edited comment on SPARK-22231 at 1/4/19 2:15 PM:
---

[~dbtsai] and [~jeremyrsmith], this all looks great and is just what we have 
been looking for.  As this isn't yet available in Spark upstream and is not 
likely to be available any time soon, is there any plan to expose this 
separately in the interim?


was (Author: timothy pharo):
[~dbtsai] and [~jeremyrsmith], this all looks great and is just what we have 
been looking.  As this isn't yet available in Spark upstream and is not likely 
to be available any time soon, is there any plan to expose this separately in 
the interim?

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-11-14 Thread DB Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252611#comment-16252611
 ] 

DB Tsai edited comment on SPARK-22231 at 11/14/17 10:45 PM:


Thanks [~jeremyrsmith] for adding more details. There are couple technical 
challenging as [~jeremyrsmith] pointed out; we can agree on what APIs should 
look like in this JIRA first, and then address those challenging in PRs. 

# [~rxin], with {{dropColumn}} support in {{Column}}, we can have some share 
building block on {{Column}} like 
{code}
def computeNewFeature: Column => Column = (col: Column) => {
  col.withColumn(col("feature") * 2 as "newFeature").dropColumn("feature")
}
{code} I agree that this can be achieved with dataframe with {{drop}} as well, 
but sometimes, it's more convenient to work with {{Column}} API. Plus, as 
[~nkronenfeld] pointed out, it's nice to have {{Column}} and {{Dataset}} to 
share some signatures. 
# In our internal implementation, we use {{Column.withField}} instead of 
{{Column.withColumn}} as the discussion above. Since I prefer to have both 
{{Column}} and {{Dataset}} sharing the same method names, so in the example I 
wrote in this JIRA, I use {{Column.withColumn}}. But I agree that 
{{Column.withField}} is less confusing. Any more feedback on this?



was (Author: dbtsai):
Thanks [~jeremyrsmith] for adding more details. There are couple technical 
challenging as [~jeremyrsmith] pointed out; we can agree on what APIs should 
look like in this JIRA first, and then address those challenging in PRs. 

# [~rxin], with {{dropColumn}} support in {{Column}}, we can have some share 
building block on {{Column}} like 
{code}
def computeNewFeature: Column => Column = (col: Column) => {
  col.withColumn(col("feature") * 2 as "newFeature").dropColumn("feature")
}
{code} I agree that this can be achieved with dataframe with {{drop}} as well, 
but sometimes, it's more convenient to work with {{Column}} API. Plus, as 
[~nkronenfeld] pointed out, it's nice to have {{Column}} and {{Dataset}} to 
share some signatures. 
# In our internal implementation, we use {{Column.withField}} instead of 
{{Column.withColumn}}. Since I prefer to have both {{Column}} and {{Dataset}} 
sharing the same method names, so in the example I wrote above, I use 
{{Column.withColumn}}. But I agree that {{Column.withField}} is less confusing. 
Anymore feedback on this?


> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-14 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199789#comment-16199789
 ] 

Liang-Chi Hsieh edited comment on SPARK-22231 at 10/15/17 4:02 AM:
---

[~jeremyrsmith] Thanks for the context. Regarding the versions for the 
`Column`, I have a question:

You said the input `Column` to the lambda represents an element of the array. 
If so, should we call it `Element` instead of `Column`? Because it sounds not a 
column at all.




was (Author: viirya):
[~Jeremy Smith] Thanks for the context. Regarding the versions for the 
`Column`, I have a question:

You said the input `Column` to the lambda represents an element of the array. 
If so, should we call it `Element` instead of `Column`? Because it sounds not a 
column at all.



> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Nathan Kronenfeld (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198861#comment-16198861
 ] 

Nathan Kronenfeld edited comment on SPARK-22231 at 10/10/17 7:39 PM:
-

One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 

In response to [~jeremyrsmith] below:
re: using withField instead of withColumn - that seems perfectly reasonable (I 
could accept either way); the description of this issue uses withColumn, that 
probably want to be updated then.
re: mapItems on a row: I was thinking more of it acting on the row itself, 
looking at it as an array of objects.  Changing 'withColumn' to 'withField' may 
make a sharp enough break to make this a non-issue, though.


was (Author: nkronenfeld):
One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 
In response to [~jeremyrsmith] below:
re: using withField instead of withColumn - that seems perfectly reasonable (I 
could accept either way); the description of this issue uses withColumn, that 
probably want to be updated then.
re: mapItems on a row: I was thinking more of it acting on the row itself, 
looking at it as an array of objects.  Changing 'withColumn' to 'withField' may 
make a sharp enough break to make this a non-issue, though.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-10 Thread Nathan Kronenfeld (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198861#comment-16198861
 ] 

Nathan Kronenfeld edited comment on SPARK-22231 at 10/10/17 7:38 PM:
-

One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 
In response to [~jeremyrsmith] below:
re: using withField instead of withColumn - that seems perfectly reasonable (I 
could accept either way); the description of this issue uses withColumn, that 
probably want to be updated then.
re: mapItems on a row: I was thinking more of it acting on the row itself, 
looking at it as an array of objects.  Changing 'withColumn' to 'withField' may 
make a sharp enough break to make this a non-issue, though.


was (Author: nkronenfeld):
One couple related concerns... 

# I think Column.withColumn and Dataset.withColumn should have the same 
signature.  I don't really care which it uses - your pattern works just as well 
as the pattern currently used by Dataset - but consistency between the two 
would be nice.
Perhaps even we could allow either pattern in both places?
# Similarly, perhaps Column should also get a withColumnRenamed method?
# Similarly, is there any reason mapItems should work at the column level, but 
not at the Dataset level? 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>Assignee: Jeremy Smith
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198096#comment-16198096
 ] 

Liang-Chi Hsieh edited comment on SPARK-22231 at 10/10/17 3:28 AM:
---

Looks like {{mapItems}} is an API can work on any Array type column?

When will we detect possible error of operations on the element? Will we do 
analysis check or we only know it during runtime? For example, when we do 
{{item.drop("b")}} but there is no such column in {{item}}, will we know it 
before running the job?


was (Author: viirya):
Looks like `mapItems` is an API can work on any Array type column?

When will we detect possible error of operations on the element? Will we do 
analysis check or we only know it during runtime? For example, when we do 
`item.drop("b")` but there is no such column in `item`, will we know it before 
running the job?

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 

[jira] [Comment Edited] (SPARK-22231) Support of map, filter, withColumn, dropColumn in nested list of structures

2017-10-09 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16198082#comment-16198082
 ] 

Liang-Chi Hsieh edited comment on SPARK-22231 at 10/10/17 3:18 AM:
---

I think there is a typo in the second example to add a new column:
{code}
val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "c")
}
{code}


was (Author: viirya):
I think there is a typo in the second example to add a new column:
{code:scala}
val result = df.mapItems("items") {
  item => item.withColumn(item("b") + 1 as "c")
}
{code}

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---
>
> Key: SPARK-22231
> URL: https://issues.apache.org/jira/browse/SPARK-22231
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  ||-- element: struct (containsNull = false)
>  |||-- title_id: integer (nullable = true)
>  |||-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: double (containsNull = true)
> result.show()
> // +---+++
> // |foo| bar|   items|
> // +---+++
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+++
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // ||-- element: struct