[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-04 Thread Hyukjin Kwon (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-37201:
-
Issue Type: Improvement  (was: Bug)

> Spark SQL reads unnecessary nested fields (filter after explode)
> 
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Sergey Kotlov
>Priority: Major
>
> In this example, reading unnecessary nested fields still happens.
> Data preparation:
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, array: Seq[String])
> Seq(
>   Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
> ).toDF().write.mode("overwrite").saveAsTable("table")
> {code}
>  v2 and v3 columns aren't needed here, but still exist in the physical plan.
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>  
> == Physical Plan ==
> ... ReadSchema: 
> struct,array:array>
> {code}
> If you just remove _filter_ or move _explode_ to second _select_, everything 
> is fine:
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   //.filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> spark.table("table")
>   .select($"struct.v1", $"array")
>   .select($"v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> {code}
>  
> *Yet another example: left_anti join after double select:*
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, field1: String, field2: String)
> Seq(
>   Event(Struct("v1","v2","v3"), "fld1", "fld2")
> ).toDF().write.mode("overwrite").saveAsTable("table")
> val joinDf = Seq("id1").toDF("id")
> spark.table("table")
>   .select("struct", "field1")
>   .select($"struct.v1", $"field1")
>   .join(joinDf, $"field1" === joinDf("id"), "left_anti")
>   .explain(true)
> // ===> ReadSchema: 
> struct,field1:string>
> {code}
> Instead of the first select, it can be other types of manipulations with the 
> original df, for example {color:#00875a}.withColumn("field3", 
> lit("f3")){color} or {color:#00875a}.drop("field2"){color}, which will also 
> lead to reading unnecessary nested fields from _struct_.
> But if you just remove the first select or change type of join, reading 
> nested fields will be correct:
> {code:java}
> // .select("struct", "field1")
> ===> ReadSchema: struct,field1:string>
> .join(joinDf, $"field1" === joinDf("id"), "left")
> ===> ReadSchema: struct,field1:string>
> {code}
> PS: The first select might look strange in the context of this example, but 
> in a real system, it might be part of a common api, that other parts of the 
> system use with their own expressions on top of this api.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-03 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Description: 
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
Instead of the first select, it can be other types of manipulations with the 
original df, for example {color:#00875a}.withColumn("field3", lit("f3")){color} 
or {color:#00875a}.drop("field2"){color}, which will also lead to reading 
unnecessary nested fields from _struct_.

But if you just remove the first select or change type of join, reading nested 
fields will be correct:
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.

  was:
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
Instead of the first select, it can be other types of manipulations with the 
original df, for example {{^~.withColumn("field3", lit("f3"))~^}} or 
.drop("field2"), which will also lead to reading unnecessary nested fields from 
_struct_.

But if you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.


> Spark SQL 

[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-03 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Description: 
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
Instead of the first select, it can be other types of manipulations with the 
original df, for example {{^~.withColumn("field3", lit("f3"))~^}} or 
.drop("field2"), which will also lead to reading unnecessary nested fields from 
_struct_.

But if you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.

  was:
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
If you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.


> Spark SQL reads unnecessary nested fields (filter after explode)
> 
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue 

[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-03 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Description: 
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
  Event(Struct("v1","v2","v3"), "fld1", "fld2")
).toDF().write.mode("overwrite").saveAsTable("table")
val joinDf = Seq("id1").toDF("id")

spark.table("table")
  .select("struct", "field1")
  .select($"struct.v1", $"field1")
  .join(joinDf, $"field1" === joinDf("id"), "left_anti")
  .explain(true)

// ===> ReadSchema: 
struct,field1:string>
{code}
If you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.

  was:
In this example, reading unnecessary nested fields still happens.

Data preparation:

 
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 

v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct,array:array>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct,array:array>
{code}


> Spark SQL reads unnecessary nested fields (filter after explode)
> 
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Sergey Kotlov
>Priority: Major
>
> In this example, reading unnecessary nested fields still happens.
> Data preparation:
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, array: Seq[String])
> Seq(
>   Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
> ).toDF().write.mode("overwrite").saveAsTable("table")
> {code}
>  v2 and v3 columns aren't needed here, but still exist in the physical plan.
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>  
> == Physical Plan ==
> ... ReadSchema: 
> struct,array:array>
> {code}
> If you just remove _filter_ or move _explode_ to second _select_, everything 
> is fine:
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   //.filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> spark.table("table")
>   .select($"struct.v1", $"array")
>   .select($"v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> {code}
>  
> *Yet another example: left_anti join after double 

[jira] [Updated] (SPARK-37201) Spark SQL reads unnecessary nested fields (filter after explode)

2021-11-02 Thread Sergey Kotlov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
--
Summary: Spark SQL reads unnecessary nested fields (filter after explode)  
(was: Spark SQL reads unnecсessary nested fields (filter after explode))

> Spark SQL reads unnecessary nested fields (filter after explode)
> 
>
> Key: SPARK-37201
> URL: https://issues.apache.org/jira/browse/SPARK-37201
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Sergey Kotlov
>Priority: Major
>
> In this example, reading unnecessary nested fields still happens.
> Data preparation:
>  
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, array: Seq[String])
> Seq(
>   Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
> ).toDF().write.mode("overwrite").saveAsTable("table")
> {code}
>  
> v2 and v3 columns aren't needed here, but still exist in the physical plan.
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>  
> == Physical Plan ==
> ... ReadSchema: 
> struct,array:array>
> {code}
> If you just remove _filter_ or move _explode_ to second _select_, everything 
> is fine:
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   //.filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> spark.table("table")
>   .select($"struct.v1", $"array")
>   .select($"v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct,array:array>
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org