[ 
https://issues.apache.org/jira/browse/SPARK-44805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754344#comment-17754344
 ] 

Bruce Robbins edited comment on SPARK-44805 at 8/15/23 12:26 AM:
-----------------------------------------------------------------

[~sunchao] 

It seems to be some weird interaction between Parquet nested vectorization and 
the {{Cast}} expression:
{noformat}
drop table if exists t1;

create table t1 using parquet as
select * from values
(named_struct('f1', array(1, 2, 3), 'f2', array(1, 1, 2)))
as (value);

select value from t1;
{"f1":[1,2,3],"f2":[1,1,2]}         <== this is expected
Time taken: 0.126 seconds, Fetched 1 row(s)

select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from t1;
{"f1":[1.0,2.0,3.0],"f2":[0,0,0]}   <== this is not expected
Time taken: 0.102 seconds, Fetched 1 row(s)

set spark.sql.parquet.enableNestedColumnVectorizedReader=false;

select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from t1;
{"f1":[1.0,2.0,3.0],"f2":[1,1,2]}   <== now has expected value
Time taken: 0.244 seconds, Fetched 1 row(s)
{noformat}
The union operation adds this {{Cast}} expression because {{value}} has 
different datatypes between your two dataframes.


was (Author: bersprockets):
It seems to be some weird interaction between Parquet and the {{Cast}} 
expression:
{noformat}
drop table if exists t1;

create table t1 using parquet as
select * from values
(named_struct('f1', array(1, 2, 3), 'f2', array(1, 1, 2)))
as (value);

select value from t1;
{"f1":[1,2,3],"f2":[1,1,2]}         <== this is expected
Time taken: 0.126 seconds, Fetched 1 row(s)

select cast(value as struct<f1:array<double>,f2:array<int>>) AS value from t1;
{"f1":[1.0,2.0,3.0],"f2":[0,0,0]}   <== this is not expected
Time taken: 0.102 seconds, Fetched 1 row(s)
{noformat}
The union operation adds this {{Cast}} expression because {{value}} has 
different datatypes between your two dataframes.

> Data lost after union using 
> spark.sql.parquet.enableNestedColumnVectorizedReader=true
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-44805
>                 URL: https://issues.apache.org/jira/browse/SPARK-44805
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.1
>         Environment: pySpark, linux, hadoop, parquet. 
>            Reporter: Jakub Wozniak
>            Priority: Major
>
> When union-ing two DataFrames read from parquet containing nested structures 
> (2 fields of array types where one is double and second is integer) data from 
> the second field seems to be lost (zeros are set instead). 
> This seems to be the case only if nested vectorised reader is used 
> (spark.sql.parquet.enableNestedColumnVectorizedReader=true). 
> The following Python code reproduces the problem: 
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.types import *
> # PREPARING DATA
> data1 = []
> data2 = []
> for i in range(2): 
>     data1.append( (([1,2,3],[1,1,2]),i))
>     data2.append( (([1.0,2.0,3.0],[1,1]),i+10))
> schema1 = StructType([
>         StructField('value', StructType([
>              StructField('f1', ArrayType(IntegerType()), True),
>              StructField('f2', ArrayType(IntegerType()), True)             
>              ])),
>          StructField('id', IntegerType(), True)
> ])
> schema2 = StructType([
>         StructField('value', StructType([
>              StructField('f1', ArrayType(DoubleType()), True),
>              StructField('f2', ArrayType(IntegerType()), True)             
>              ])),
>          StructField('id', IntegerType(), True)
> ])
> spark = SparkSession.builder.getOrCreate()
> data_dir = "/user/<user>/"
> df1 = spark.createDataFrame(data1, schema1)
> df1.write.mode('overwrite').parquet(data_dir + "data1") 
> df2 = spark.createDataFrame(data2, schema2)
> df2.write.mode('overwrite').parquet(data_dir + "data2") 
> # READING DATA
> parquet1 = spark.read.parquet(data_dir + "data1")
> parquet2 = spark.read.parquet(data_dir + "data2")
> # UNION
> out = parquet1.union(parquet2)
> parquet1.select("value.f2").distinct().show()
> out.select("value.f2").distinct().show()
> print(parquet1.collect())
> print(out.collect()) {code}
> Output: 
> {code:java}
> +---------+
> |       f2|
> +---------+
> |[1, 1, 2]|
> +---------+
> +---------+
> |       f2|
> +---------+
> |[0, 0, 0]|
> |   [1, 1]|
> +---------+
> [
> Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=0), 
> Row(value=Row(f1=[1, 2, 3], f2=[1, 1, 2]), id=1)
> ]
> [
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=0), 
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[0, 0, 0]), id=1), 
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=10), 
> Row(value=Row(f1=[1.0, 2.0, 3.0], f2=[1, 1]), id=11)
> ] {code}
> Please notice that values for the field f2 are lost after the union is done. 
> This only happens when this data is read from parquet files. 
> Could you please look into this? 
> Best regards,
> Jakub



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to