[jira] [Updated] (SPARK-39380) Ignore comment syntax in dfs command

2022-06-04 Thread Chen Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated SPARK-39380:
---
Description: 
In version 3.2, Spark SQL ignores semicolons in comment syntax when splitting 
input lines.

This behavior throws an unexpected dfs command error:

(Related PR: SPARK-33100 SPARK-30049)


{code:bash}
mkdir testDfsCommand
echo 'abc' > testDfsCommand/a---.txt

./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/a---.txt;'
# abc
./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
# abc

./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/a---.txt;'
# cat: `testDfsCommand/a---.txt;': No such file or directory
# Command -cat testDfsCommand/a---.txt; failed with exit code = 1
./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
# cat: `testDfsCommand/*;': No such file or directory
# Command -cat testDfsCommand/*; failed with exit code = 1
{code}


  was:
In version 3.2, Spark SQL ignores semicolons in comment syntax when splitting 
input lines. 

This behavior throws an unexpected dfs command error:

(Related PR: SPARK-33100 SPARK-30049)
mkdir testDfsCommand
echo 'abc' > testDfsCommand/a---.txt
​
./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/a---.txt;'
# abc
./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
# abc
​
./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/a---.txt;'
# cat: `testDfsCommand/a---.txt;': No such file or directory
# Command -cat testDfsCommand/a---.txt; failed with exit code = 1
./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
# cat: `testDfsCommand/*;': No such file or directory
# Command -cat testDfsCommand/*; failed with exit code = 1


> Ignore comment syntax in dfs command
> 
>
> Key: SPARK-39380
> URL: https://issues.apache.org/jira/browse/SPARK-39380
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.1
>Reporter: Chen Zhang
>Priority: Major
>
> In version 3.2, Spark SQL ignores semicolons in comment syntax when splitting 
> input lines.
> This behavior throws an unexpected dfs command error:
> (Related PR: SPARK-33100 SPARK-30049)
> {code:bash}
> mkdir testDfsCommand
> echo 'abc' > testDfsCommand/a---.txt
> ./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat 
> testDfsCommand/a---.txt;'
> # abc
> ./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
> # abc
> ./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat 
> testDfsCommand/a---.txt;'
> # cat: `testDfsCommand/a---.txt;': No such file or directory
> # Command -cat testDfsCommand/a---.txt; failed with exit code = 1
> ./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
> # cat: `testDfsCommand/*;': No such file or directory
> # Command -cat testDfsCommand/*; failed with exit code = 1
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39380) Ignore comment syntax in dfs command

2022-06-04 Thread Chen Zhang (Jira)
Chen Zhang created SPARK-39380:
--

 Summary: Ignore comment syntax in dfs command
 Key: SPARK-39380
 URL: https://issues.apache.org/jira/browse/SPARK-39380
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.1
Reporter: Chen Zhang


In version 3.2, Spark SQL ignores semicolons in comment syntax when splitting 
input lines. 

This behavior throws an unexpected dfs command error:

(Related PR: SPARK-33100 SPARK-30049)
mkdir testDfsCommand
echo 'abc' > testDfsCommand/a---.txt
​
./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/a---.txt;'
# abc
./spark-2.4.8-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
# abc
​
./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/a---.txt;'
# cat: `testDfsCommand/a---.txt;': No such file or directory
# Command -cat testDfsCommand/a---.txt; failed with exit code = 1
./spark-3.2.1-bin-hadoop2.7/bin/spark-sql -e 'dfs -cat testDfsCommand/*;'
# cat: `testDfsCommand/*;': No such file or directory
# Command -cat testDfsCommand/*; failed with exit code = 1



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32978) Incorrect number of dynamic part metric

2020-09-24 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17201516#comment-17201516
 ] 

Chen Zhang commented on SPARK-32978:


Hello, [~yumwang]

I used the default config Spark to run this code and failed to reproduce this 
issue.
{code:none}
number of written files: 50
written output: 55.4 KiB
number of output rows: 10,000
number of dynamic part: 50
{code}

> Incorrect number of dynamic part metric
> ---
>
> Key: SPARK-32978
> URL: https://issues.apache.org/jira/browse/SPARK-32978
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
> Attachments: screenshot-1.png
>
>
> How to reproduce this issue:
> {code:sql}
> create table dynamic_partition(i bigint, part bigint) using parquet 
> partitioned by (part);
> insert overwrite table dynamic_partition partition(part) select id, id % 50 
> as part  from range(1);
> {code}
> The number of dynamic part should be 50, but it is 800.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32956) Duplicate Columns in a csv file

2020-09-22 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200382#comment-17200382
 ] 

Chen Zhang commented on SPARK-32956:


Okay, I will submit a PR later.

> Duplicate Columns in a csv file
> ---
>
> Key: SPARK-32956
> URL: https://issues.apache.org/jira/browse/SPARK-32956
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.3, 2.4.4, 2.4.5, 2.4.6, 2.4.7, 3.0.0, 3.0.1
>Reporter: Punit Shah
>Priority: Major
>
> Imagine a csv file shaped like:
> 
> Id,Product,Sale_Amount,Sale_Units,Sale_Amount2,Sale_Amount,Sale_Price
> 1,P,"6,40,728","6,40,728","6,40,728","6,40,728","6,40,728"
> 2,P,"5,81,644","5,81,644","5,81,644","5,81,644","5,81,644"
> =
> Reading this with the header=True will result in a stacktrace.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32956) Duplicate Columns in a csv file

2020-09-22 Thread Chen Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated SPARK-32956:
---
Component/s: (was: Spark Core)
 SQL

> Duplicate Columns in a csv file
> ---
>
> Key: SPARK-32956
> URL: https://issues.apache.org/jira/browse/SPARK-32956
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.3, 2.4.4, 2.4.5, 2.4.6, 2.4.7, 3.0.0, 3.0.1
>Reporter: Punit Shah
>Priority: Major
>
> Imagine a csv file shaped like:
> 
> Id,Product,Sale_Amount,Sale_Units,Sale_Amount2,Sale_Amount,Sale_Price
> 1,P,"6,40,728","6,40,728","6,40,728","6,40,728","6,40,728"
> 2,P,"5,81,644","5,81,644","5,81,644","5,81,644","5,81,644"
> =
> Reading this with the header=True will result in a stacktrace.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32956) Duplicate Columns in a csv file

2020-09-22 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17200092#comment-17200092
 ] 

Chen Zhang commented on SPARK-32956:


In SPARK-16896, if the CSV data has duplicate column headers, put the index as 
the suffix.

 

In this case, _Sale_Amount_ is a duplicate column header. 
 Original column header:
{code:none}
Id, Product, Sale_Amount, Sale_Units, Sale_Amount2, Sale_Amount, 
Sale_Price{code}
Column header after adding index suffix:
{code:none}
Id, Product, Sale_Amount2, Sale_Units, Sale_Amount2, Sale_Amount5, 
Sale_Price{code}
The _Sale_Amount2_ after adding the suffix is still the same as the other 
column header.

 

Maybe we can add the suffix again when we find a new duplicate column header:
{code:none}
Id, Product, Sale_Amount22, Sale_Units, Sale_Amount24, Sale_Amount5, 
Sale_Price{code}

> Duplicate Columns in a csv file
> ---
>
> Key: SPARK-32956
> URL: https://issues.apache.org/jira/browse/SPARK-32956
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.4.3, 2.4.4, 2.4.5, 2.4.6, 2.4.7, 3.0.0, 3.0.1
>Reporter: Punit Shah
>Priority: Major
>
> Imagine a csv file shaped like:
> 
> Id,Product,Sale_Amount,Sale_Units,Sale_Amount2,Sale_Amount,Sale_Price
> 1,P,"6,40,728","6,40,728","6,40,728","6,40,728","6,40,728"
> 2,P,"5,81,644","5,81,644","5,81,644","5,81,644","5,81,644"
> =
> Reading this with the header=True will result in a stacktrace.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-09-02 Thread Chen Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated SPARK-32317:
---
Labels:   (was: easyfix)

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> 

[jira] [Updated] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-09-02 Thread Chen Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated SPARK-32317:
---
Component/s: (was: PySpark)
 SQL

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
> org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:225)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:206) at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:674) at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> 

[jira] [Updated] (SPARK-32639) Support GroupType parquet mapkey field

2020-08-17 Thread Chen Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated SPARK-32639:
---
Attachment: 000.snappy.parquet

> Support GroupType parquet mapkey field
> --
>
> Key: SPARK-32639
> URL: https://issues.apache.org/jira/browse/SPARK-32639
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Chen Zhang
>Priority: Major
> Attachments: 000.snappy.parquet
>
>
> I have a parquet file, and the MessageType recorded in the file is:
> {code:java}
> message parquet_schema {
>   optional group value (MAP) {
> repeated group key_value {
>   required group key {
> optional binary first (UTF8);
> optional binary middle (UTF8);
> optional binary last (UTF8);
>   }
>   optional binary value (UTF8);
> }
>   }
> }{code}
>  
> Use +spark.read.parquet("000.snappy.parquet")+ to read the file. Spark will 
> throw an exception when converting Parquet MessageType to Spark SQL 
> StructType:
> {code:java}
> AssertionError(Map key type is expected to be a primitive type, but found...)
> {code}
>  
> Use +spark.read.schema("value MAP last:STRING>, STRING>").parquet("000.snappy.parquet")+ to read the file, 
> spark returns the correct result .
> According to the parquet project document 
> (https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps), 
> the mapKey in the parquet format does not need to be a primitive type.
>  
> Note: This parquet file is not written by spark, because spark will write 
> additional sparkSchema string information in the parquet file. When Spark 
> reads, it will directly use the additional sparkSchema information in the 
> file instead of converting Parquet MessageType to Spark SQL StructType.
> I will submit a PR later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32639) Support GroupType parquet mapkey field

2020-08-17 Thread Chen Zhang (Jira)
Chen Zhang created SPARK-32639:
--

 Summary: Support GroupType parquet mapkey field
 Key: SPARK-32639
 URL: https://issues.apache.org/jira/browse/SPARK-32639
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0, 2.4.6
Reporter: Chen Zhang


I have a parquet file, and the MessageType recorded in the file is:
{code:java}
message parquet_schema {
  optional group value (MAP) {
repeated group key_value {
  required group key {
optional binary first (UTF8);
optional binary middle (UTF8);
optional binary last (UTF8);
  }
  optional binary value (UTF8);
}
  }
}{code}
 

Use +spark.read.parquet("000.snappy.parquet")+ to read the file. Spark will 
throw an exception when converting Parquet MessageType to Spark SQL StructType:
{code:java}
AssertionError(Map key type is expected to be a primitive type, but found...)
{code}
 

Use +spark.read.schema("value MAP, STRING>").parquet("000.snappy.parquet")+ to read the file, spark 
returns the correct result .

According to the parquet project document 
(https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps), 
the mapKey in the parquet format does not need to be a primitive type.

 
Note: This parquet file is not written by spark, because spark will write 
additional sparkSchema string information in the parquet file. When Spark 
reads, it will directly use the additional sparkSchema information in the file 
instead of converting Parquet MessageType to Spark SQL StructType.


I will submit a PR later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-19 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160786#comment-17160786
 ] 

Chen Zhang commented on SPARK-32317:


Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:69)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:63)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:141)
>  at 
> 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-19 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160207#comment-17160207
 ] 

Chen Zhang edited comment on SPARK-32317 at 7/19/20, 6:40 PM:
--

The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

 


was (Author: chen zhang):
The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> 

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-19 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160207#comment-17160207
 ] 

Chen Zhang edited comment on SPARK-32317 at 7/19/20, 6:38 PM:
--

The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.


was (Author: chen zhang):
The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  

[jira] [Comment Edited] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-19 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160207#comment-17160207
 ] 

Chen Zhang edited comment on SPARK-32317 at 7/19/20, 6:10 PM:
--

The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

Spark uses requiredSchema to convert the read parquet file data. It does not 
consider that the requiredSchema and the schema stored in the file may be 
different. This may potentially cause data correctness problems.

I think it is necessary to consider the mapping between the requiredSchema and 
the schema stored in the file. I will try to modify the code, and if it goes 
well, I can submit a PR.


was (Author: chen zhang):
The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

I see that the spark source code 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter use 
catalystType to convert the decimal type. Maybe we should use the schema in the 
parquet file to convert.

something like:
{code:java}
//case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == 
INT64 =>
//  new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == 
INT64 =>
  val mate = parquetType.asPrimitiveType().getDecimalMetadata()
  new ParquetLongDictionaryAwareDecimalConverter(mate.getPrecision, 
mate.getScale, updater)
{code}
I will do some validation later. 

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at 

[jira] [Commented] (SPARK-32317) Parquet file loading with different schema(Decimal(N, P)) in files is not working as expected

2020-07-17 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160207#comment-17160207
 ] 

Chen Zhang commented on SPARK-32317:


The DECIMAL type in Parquet is stored by INT32, INT64, FIXED_LEN_BYTE_ARRAY, or 
BINARY.

Taking 19500.00 stored as INT64 as an example, the parquet file is stored as 
DECIMAL(15,2) -> 195, DECIMAL(15,6) -> 195.

I see that the spark source code 
org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter use 
catalystType to convert the decimal type. Maybe we should use the schema in the 
parquet file to convert.

something like:
{code:java}
//case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == 
INT64 =>
//  new ParquetIntDictionaryAwareDecimalConverter(t.precision, t.scale, updater)
case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == 
INT64 =>
  val mate = parquetType.asPrimitiveType().getDecimalMetadata()
  new ParquetLongDictionaryAwareDecimalConverter(mate.getPrecision, 
mate.getScale, updater)
{code}
I will do some validation later. 

> Parquet file loading with different schema(Decimal(N, P)) in files is not 
> working as expected
> -
>
> Key: SPARK-32317
> URL: https://issues.apache.org/jira/browse/SPARK-32317
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
> Environment: Its failing in all environments that I tried.
>Reporter: Krish
>Priority: Major
>  Labels: easyfix
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi,
>  
> We generate parquet files which are partitioned on Date on a daily basis, and 
> we send updates to historical data some times, what we noticed is due to some 
> configuration error the patch data schema is inconsistent to earlier files.
> Assuming we had files generated with schema having ID and Amount as fields. 
> Historical data is having schema like ID INT, AMOUNT DECIMAL(15,6) and the 
> files we send as updates has schema like DECIMAL(15,2). 
>  
> Having two different schema in a Date partition and when we load the data of 
> a Date into spark, it is loading the data but the amount is getting 
> manipulated.
>  
> file1.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,6)
>  Content:
>  1,19500.00
>  2,198.34
> file2.snappy.parquet
>  ID: INT
>  AMOUNT: DECIMAL(15,2)
>  Content:
>  1,19500.00
>  3,198.34
> Load these two files togeather
> df3 = spark.read.parquet("output/")
> df3.show() #-we can see amount getting manipulated here,
> +-+---+
> |ID|   AMOUNT|
> +-+---+
> |1|1.95|
> |3|0.019834|
> |1|19500.00|
> |2|198.34|
> +-+---+
>  x
> Options Tried:
> We tried to give schema as String for all fields, but that didt work
> df3 = spark.read.format("parquet").schema(schema).load("output/")
> Error: "org.apache.spark.sql.execution.QueryExecutionException: Parquet 
> column cannot be converted in file file*.snappy.parquet. Column: 
> [AMOUNT], Expected: string, Found: INT64"
>  
> I know merge schema works if it finds few extra columns in one file but the 
> fileds which are in common needs to have same schema. That might nort work 
> here.
>  
> Looking for some work around solution here. Or if there is an option which I 
> havent tried you can point me to that.
>  
> With schema merging I got below eeror:
> An error occurred while calling o2272.parquet. : 
> org.apache.spark.SparkException: Failed merging schema: root |-- ID: string 
> (nullable = true) |-- AMOUNT: decimal(15,6) (nullable = true) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5(SchemaMergeUtils.scala:100)
>  at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$5$adapted(SchemaMergeUtils.scala:95)
>  at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.mergeSchemasInParallel(SchemaMergeUtils.scala:95)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:485)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.inferSchema(ParquetUtils.scala:107)
>  at 
> org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable.inferSchema(ParquetTable.scala:44)
>  at 
> org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:69)
>  at scala.Option.orElse(Option.scala:447) at 
> 

[jira] [Commented] (SPARK-32226) JDBC TimeStamp predicates always append `.0`

2020-07-13 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156501#comment-17156501
 ] 

Chen Zhang commented on SPARK-32226:


[~thesuperzapper], glad to receive your reply.

I don't have an informix database, so I can't test it on my computer.

But I think this problem can be solved by implementing methods in _[case object 
InformixDialect extends org.apache.spark.sql.jdbc.JdbcDialect]_, such as 
getCatalystType, getJDBCType, compileValue.

Maybe you can refer to SPARK-29836.

> JDBC TimeStamp predicates always append `.0`
> 
>
> Key: SPARK-32226
> URL: https://issues.apache.org/jira/browse/SPARK-32226
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mathew Wicks
>Priority: Major
>
> If you have an Informix column with type `DATETIME YEAR TO SECOND`, Informix 
> will not let you pass a filter of the form `2020-01-01 00:00:00.0` (with the 
> `.0` at the end).
>  
> In Spark 3.0.0, our predicate pushdown will alway append this `.0` to the end 
> of a TimeStamp column filter, even if you don't specify it:
> {code:java}
> df.where("col1 > '2020-01-01 00:00:00'")
> {code}
>  
> I think we should only pass the `.XXX` suffix if the user passes it in the 
> filter, for example:
> {code:java}
> df.where("col1 > '2020-01-01 00:00:00.123'")
> {code}
>  
> The relevant Spark class is:
> {code:java}
> org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString
> {code}
>  
>  To aid people searching for this error, here is the error emitted by spark:
> {code:java}
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
>   at scala.Option.foreach(Option.scala:407)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
>   at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
>   at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
>   at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
>   at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
>   ... 47 elided
> Caused by: java.sql.SQLException: Extra characters at the end of a datetime 
> or 

[jira] [Commented] (SPARK-32226) JDBC TimeStamp predicates always append `.0`

2020-07-12 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156287#comment-17156287
 ] 

Chen Zhang commented on SPARK-32226:


Hello [~thesuperzapper],

The dialect of Informix database is not implemented in spark source code.
You can run code similar to the following to register a dialect about the 
Informix database, which may solve your program errors.
{code:scala}
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Locale
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.jdbc.JdbcDialects

case object InformixDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:informix")

  override def compileValue(value: Any): Any = value match {
case timestampValue: Timestamp =>
  val dateFormat = new SimpleDateFormat("-MM-dd HH:mm:ss.SSS")
  val timestampString = dateFormat.format(timestampValue)
  "'" + timestampString + "'"
case v => super.compileValue(v)
  }

}

JdbcDialects.registerDialect(InformixDialect)
{code}

> JDBC TimeStamp predicates always append `.0`
> 
>
> Key: SPARK-32226
> URL: https://issues.apache.org/jira/browse/SPARK-32226
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Mathew Wicks
>Priority: Major
>
> If you have an Informix column with type `DATETIME YEAR TO SECOND`, Informix 
> will not let you pass a filter of the form `2020-01-01 00:00:00.0` (with the 
> `.0` at the end).
>  
> In Spark 3.0.0, our predicate pushdown will alway append this `.0` to the end 
> of a TimeStamp column filter, even if you don't specify it:
> {code:java}
> df.where("col1 > '2020-01-01 00:00:00'")
> {code}
>  
> I think we should only pass the `.XXX` suffix if the user passes it in the 
> filter, for example:
> {code:java}
> df.where("col1 > '2020-01-01 00:00:00.123'")
> {code}
>  
> The relevant Spark class is:
> {code:java}
> org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString
> {code}
>  
>  To aid people searching for this error, here is the error emitted by spark:
> {code:java}
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
>   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
>   at 
> org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
>   at scala.Option.foreach(Option.scala:407)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
>   at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
>   at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
>   at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at 

[jira] [Commented] (SPARK-31635) Spark SQL Sort fails when sorting big data points

2020-07-11 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17156111#comment-17156111
 ] 

Chen Zhang commented on SPARK-31635:


Hello [~george21],

I have submitted a PR.

Please take a look.

[https://github.com/apache/spark/pull/29028]

> Spark SQL Sort fails when sorting big data points
> -
>
> Key: SPARK-31635
> URL: https://issues.apache.org/jira/browse/SPARK-31635
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2
>Reporter: George George
>Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> test.toDF().as[Nested].sort("a").take(1)
> {code}
>  *Sorting* big data objects using *Spark Dataframe* is failing with following 
> exception: 
> {code:java}
> 2020-05-04 08:01:00 ERROR TaskSetManager:70 - Total size of serialized 
> results of 14 tasks (107.8 MB) is bigger than spark.driver.maxResultSize 
> (100.0 MB)
> [Stage 0:==> (12 + 3) / 
> 100]org.apache.spark.SparkException: Job aborted due to stage failure: Total 
> size of serialized results of 13 tasks (100.1 MB) is bigger than 
> spark.driver.maxResu
> {code}
> However using the *RDD API* is working and no exception is thrown: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> test.sortBy(_.a).take(1)
> {code}
> For both code snippets we started the spark shell with exactly the same 
> arguments:
> {code:java}
> spark-shell --driver-memory 6G --conf "spark.driver.maxResultSize=100MB"
> {code}
> Even if we increase the spark.driver.maxResultSize, the executors still get 
> killed for our use case. The interesting thing is that when using the RDD API 
> directly the problem is not there. *Looks like there is a bug in dataframe 
> sort because is shuffling too much data to the driver?* 
> Note: this is a small example and I reduced the spark.driver.maxResultSize to 
> a smaller size, but in our application I've tried setting it to 8GB but as 
> mentioned above the job was killed. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32212) RDD.takeOrdered can choose to merge intermediate results in executor or driver

2020-07-07 Thread Chen Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated SPARK-32212:
---
Summary: RDD.takeOrdered can choose to merge intermediate results in 
executor or driver  (was: RDD.takeOrdered merge intermediate results can be 
configured in driver or executor)

> RDD.takeOrdered can choose to merge intermediate results in executor or driver
> --
>
> Key: SPARK-32212
> URL: https://issues.apache.org/jira/browse/SPARK-32212
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Chen Zhang
>Priority: Major
>
> In the list of issues, I saw some discussions about exceeding the memory 
> limit of the driver when using _RDD.takeOrdered_ or _SQL(order by xx limit 
> xx)_. I think that the implementation of _RDD.takeOrdered_ can be improved.
> In the original code implementation of _RDD.takeOrdered_, the QuickSelect 
> algorithm in guava is used in the executor process to calculate the local 
> TopK results of each RDD partition. These intermediate results are packaged 
> into java.util.PriorityQueue and returned to the driver process. In the 
> driver process, these intermediate results are merged to get the global TopK 
> results.
> The problem with this implementation is that if the intermediate results are 
> too large and too many partitions, the intermediate results may accumulate in 
> the memory of the driver process, causing excessive memory pressure.
> We can use an optional config to determine whether the intermediate 
> results(local TopK) of each partition in _RDD.takeOrdered_ will be merged in 
> driver process or executor process. If set to true, merge in driver 
> process(by util.PriorityQueue), which will get shorter waiting time for 
> return. But if the intermediate results are too large and too many 
> partitions, the intermediate results may accumulate in the memory of the 
> driver process, causing excessive memory pressure. If set to false, merge in 
> executor process(by guava.QuickSelect), intermediate results will not 
> accumulate in memory, but will cause longer runtimes.
> something like:
> _(org.apache.spark.rdd.RDD)_
> {code:scala}
>   def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
> if (num == 0 || partitions.length == 0) {
>   Array.empty
> } else {
>   if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
> val mapRDDs = mapPartitions { items =>
>   // Priority keeps the largest elements, so let's reverse the 
> ordering.
>   val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
>   queue ++= collectionUtils.takeOrdered(items, num)(ord)
>   Iterator.single(queue)
> }
> mapRDDs.reduce { (queue1, queue2) =>
>   queue1 ++= queue2
>   queue1
> }.toArray.sorted(ord)
>   } else {
> mapPartitions { items =>
>   collectionUtils.takeOrdered(items, num)(ord)
> }.repartition(1).mapPartitions { items =>
>   collectionUtils.takeOrdered(items, num)(ord)
> }.collect()
>   }
> }
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32212) RDD.takeOrdered merge intermediate results can be configured in driver or executor

2020-07-07 Thread Chen Zhang (Jira)
Chen Zhang created SPARK-32212:
--

 Summary: RDD.takeOrdered merge intermediate results can be 
configured in driver or executor
 Key: SPARK-32212
 URL: https://issues.apache.org/jira/browse/SPARK-32212
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: Chen Zhang


In the list of issues, I saw some discussions about exceeding the memory limit 
of the driver when using _RDD.takeOrdered_ or _SQL(order by xx limit xx)_. I 
think that the implementation of _RDD.takeOrdered_ can be improved.

In the original code implementation of _RDD.takeOrdered_, the QuickSelect 
algorithm in guava is used in the executor process to calculate the local TopK 
results of each RDD partition. These intermediate results are packaged into 
java.util.PriorityQueue and returned to the driver process. In the driver 
process, these intermediate results are merged to get the global TopK results.

The problem with this implementation is that if the intermediate results are 
too large and too many partitions, the intermediate results may accumulate in 
the memory of the driver process, causing excessive memory pressure.

We can use an optional config to determine whether the intermediate 
results(local TopK) of each partition in _RDD.takeOrdered_ will be merged in 
driver process or executor process. If set to true, merge in driver process(by 
util.PriorityQueue), which will get shorter waiting time for return. But if the 
intermediate results are too large and too many partitions, the intermediate 
results may accumulate in the memory of the driver process, causing excessive 
memory pressure. If set to false, merge in executor process(by 
guava.QuickSelect), intermediate results will not accumulate in memory, but 
will cause longer runtimes.

something like:
_(org.apache.spark.rdd.RDD)_
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31635) Spark SQL Sort fails when sorting big data points

2020-07-07 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147653#comment-17147653
 ] 

Chen Zhang edited comment on SPARK-31635 at 7/7/20, 10:14 AM:
--

In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global 
bucket Sorting, and the required number of data is returned after the global 
sorting result is obtained.All major computation are performed in the executor 
process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK in each RDD 
partition in the executor process(by QuickSelect), and then return each TOPK 
result to the driver process for merging(by PriorityQueue).

To get the same result, it is obvious that the second method based on 
QuickSelect/PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for computation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}


was (Author: chen zhang):
In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global 
bucket Sorting, and the required number of data is returned after the global 
sorting result is obtained.All major computation are performed in the executor 
process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK in each RDD 
partition in the executor process(by QuickSelect), and then return each TOPK 
result to the driver process for merging(by PriorityQueue).

To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for computation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}

> Spark SQL Sort fails when sorting big data points
> -
>
> Key: SPARK-31635
> URL: https://issues.apache.org/jira/browse/SPARK-31635
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2
>Reporter: George George
>Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> 

[jira] [Commented] (SPARK-31635) Spark SQL Sort fails when sorting big data points

2020-07-07 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152618#comment-17152618
 ] 

Chen Zhang commented on SPARK-31635:


This problem is not a bug, but I think it is necessary to improve the code 
implementation.


I will create a new Improvement issue to discuss and try to submit a PR.

> Spark SQL Sort fails when sorting big data points
> -
>
> Key: SPARK-31635
> URL: https://issues.apache.org/jira/browse/SPARK-31635
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2
>Reporter: George George
>Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> test.toDF().as[Nested].sort("a").take(1)
> {code}
>  *Sorting* big data objects using *Spark Dataframe* is failing with following 
> exception: 
> {code:java}
> 2020-05-04 08:01:00 ERROR TaskSetManager:70 - Total size of serialized 
> results of 14 tasks (107.8 MB) is bigger than spark.driver.maxResultSize 
> (100.0 MB)
> [Stage 0:==> (12 + 3) / 
> 100]org.apache.spark.SparkException: Job aborted due to stage failure: Total 
> size of serialized results of 13 tasks (100.1 MB) is bigger than 
> spark.driver.maxResu
> {code}
> However using the *RDD API* is working and no exception is thrown: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> test.sortBy(_.a).take(1)
> {code}
> For both code snippets we started the spark shell with exactly the same 
> arguments:
> {code:java}
> spark-shell --driver-memory 6G --conf "spark.driver.maxResultSize=100MB"
> {code}
> Even if we increase the spark.driver.maxResultSize, the executors still get 
> killed for our use case. The interesting thing is that when using the RDD API 
> directly the problem is not there. *Looks like there is a bug in dataframe 
> sort because is shuffling too much data to the driver?* 
> Note: this is a small example and I reduced the spark.driver.maxResultSize to 
> a smaller size, but in our application I've tried setting it to 8GB but as 
> mentioned above the job was killed. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-31635) Spark SQL Sort fails when sorting big data points

2020-07-07 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147653#comment-17147653
 ] 

Chen Zhang edited comment on SPARK-31635 at 7/7/20, 10:12 AM:
--

In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global 
bucket Sorting, and the required number of data is returned after the global 
sorting result is obtained.All major computation are performed in the executor 
process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK in each RDD 
partition in the executor process(by QuickSelect), and then return each TOPK 
result to the driver process for merging(by PriorityQueue).

To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for computation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}


was (Author: chen zhang):
In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global 
bucket Sorting, and the required number of data is returned after the global 
sorting result is obtained.All major computation are performed in the executor 
process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK(by PriorityQueue) 
in each RDD partition in the executor process, and then return each TOPK result 
to the driver process for merging.

To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for computation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.take.ordered.driver.merge", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}

> Spark SQL Sort fails when sorting big data points
> -
>
> Key: SPARK-31635
> URL: https://issues.apache.org/jira/browse/SPARK-31635
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2
>Reporter: George George
>Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> 

[jira] [Comment Edited] (SPARK-31635) Spark SQL Sort fails when sorting big data points

2020-06-29 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147653#comment-17147653
 ] 

Chen Zhang edited comment on SPARK-31635 at 6/29/20, 9:51 AM:
--

In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global 
bucket Sorting, and the required number of data is returned after the global 
sorting result is obtained.All major computation are performed in the executor 
process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK(by PriorityQueue) 
in each RDD partition in the executor process, and then return each TOPK result 
to the driver process for merging.

To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for computation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.take.ordered.driver.merge", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}


was (Author: chen zhang):
In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_
 The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + 
global bucket Sorting, and the required number of data is returned after the 
global sorting result is obtained.All major computs are performed in the 
executor process.
 The execution logic of _RDD.takeOrdered()_ is to compute TOPK(by 
PriorityQueue) in each RDD partition in the executor process, and then return 
each TOPK result to the driver process for merging.
 To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

 

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for calculation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.take.ordered.driver.merge", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}

> Spark SQL Sort fails when sorting big data points
> -
>
> Key: SPARK-31635
> URL: https://issues.apache.org/jira/browse/SPARK-31635
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2
>Reporter: George George
>Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> 

[jira] [Commented] (SPARK-31635) Spark SQL Sort fails when sorting big data points

2020-06-29 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147653#comment-17147653
 ] 

Chen Zhang commented on SPARK-31635:


In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_
 The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + 
global bucket Sorting, and the required number of data is returned after the 
global sorting result is obtained.All major computs are performed in the 
executor process.
 The execution logic of _RDD.takeOrdered()_ is to compute TOPK(by 
PriorityQueue) in each RDD partition in the executor process, and then return 
each TOPK result to the driver process for merging.
 To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

 

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for calculation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0 || partitions.length == 0) {
  Array.empty
} else {
  if (conf.getBoolean("spark.rdd.take.ordered.driver.merge", true)) {
val mapRDDs = mapPartitions { items =>
  // Priority keeps the largest elements, so let's reverse the ordering.
  val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
  queue ++= collectionUtils.takeOrdered(items, num)(ord)
  Iterator.single(queue)
}
mapRDDs.reduce { (queue1, queue2) =>
  queue1 ++= queue2
  queue1
}.toArray.sorted(ord)
  } else {
mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.repartition(1).mapPartitions { items =>
  collectionUtils.takeOrdered(items, num)(ord)
}.collect()
  }
}
  }
{code}

> Spark SQL Sort fails when sorting big data points
> -
>
> Key: SPARK-31635
> URL: https://issues.apache.org/jira/browse/SPARK-31635
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.2
>Reporter: George George
>Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> test.toDF().as[Nested].sort("a").take(1)
> {code}
>  *Sorting* big data objects using *Spark Dataframe* is failing with following 
> exception: 
> {code:java}
> 2020-05-04 08:01:00 ERROR TaskSetManager:70 - Total size of serialized 
> results of 14 tasks (107.8 MB) is bigger than spark.driver.maxResultSize 
> (100.0 MB)
> [Stage 0:==> (12 + 3) / 
> 100]org.apache.spark.SparkException: Job aborted due to stage failure: Total 
> size of serialized results of 13 tasks (100.1 MB) is bigger than 
> spark.driver.maxResu
> {code}
> However using the *RDD API* is working and no exception is thrown: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](25)(Point(1,2, 100)
> test.sortBy(_.a).take(1)
> {code}
> For both code snippets we started the spark shell with exactly the same 
> arguments:
> {code:java}
> spark-shell --driver-memory 6G --conf "spark.driver.maxResultSize=100MB"
> {code}
> Even if we increase the spark.driver.maxResultSize, the executors still get 
> killed for our use case. The interesting thing is that when using the RDD API 
> directly the problem is not there. *Looks like there is a bug in dataframe 
> sort because is shuffling too much data to the driver?* 
> Note: this is a small example and I reduced the spark.driver.maxResultSize to 
> a smaller size, but in our application I've tried setting it to 8GB but as 
> mentioned above the job was killed. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32109) SQL hash function handling of nulls makes collision too likely

2020-06-28 Thread Chen Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17147328#comment-17147328
 ] 

Chen Zhang commented on SPARK-32109:


The logic in the source code can be represented by the following pseudocode.
{code:scala}
def computeHash(value: Any, hashSeed: Long): Long = {
  value match {
case null => hashSeed
case b: Boolean => hashInt(if (b) 1 else 0, hashSeed)  // Murmur3Hash
case i: Int => hashInt(i, hashSeed)
...
  }
}
val seed = 42L
var hash = seed
var i = 0
val len = columns.length
while (i < len) {
  hash = computeHash(columns(i).value, hash)
  i += 1
}
hash
{code}
I can solve this problem by modifying the following code.
 (eval function and doGenCode function in 
org.apache.spark.sql.catalyst.expressions.HashExpression class)
{code:scala}
override def eval(input: InternalRow = null): Any = {
  var hash = seed
  var i = 0
  val len = children.length
  while (i < len) {
//hash = computeHash(children(i).eval(input), children(i).dataType, hash)
hash = (31 * hash) + computeHash(children(i).eval(input), 
children(i).dataType, hash)
i += 1
  }
  hash
}
{code}
But I don't think it's necessary to modify the code, and if we do, it will 
affect the existing data distribution.

> SQL hash function handling of nulls makes collision too likely
> --
>
> Key: SPARK-32109
> URL: https://issues.apache.org/jira/browse/SPARK-32109
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: koert kuipers
>Priority: Minor
>
> this ticket is about org.apache.spark.sql.functions.hash and sparks handling 
> of nulls when hashing sequences.
> {code:java}
> scala> spark.sql("SELECT hash('bar', null)").show()
> +---+
> |hash(bar, NULL)|
> +---+
> |-1808790533|
> +---+
> scala> spark.sql("SELECT hash(null, 'bar')").show()
> +---+
> |hash(NULL, bar)|
> +---+
> |-1808790533|
> +---+
>  {code}
> these are differences sequences. e.g. these could be positions 0 and 1 in a 
> dataframe which are diffferent columns with entirely different meanings. the 
> hashes should not be the same.
> another example:
> {code:java}
> scala> Seq(("john", null), (null, "john")).toDF("name", 
> "alias").withColumn("hash", hash(col("name"), col("alias"))).show
> ++-+-+
> |name|alias| hash|
> ++-+-+
> |john| null|487839701|
> |null| john|487839701|
> ++-+-+ {code}
> instead of ignoring nulls each null show do a transform to the hash so that 
> the order of elements including the nulls matters for the outcome.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10925) Exception when joining DataFrames

2016-09-02 Thread Chen Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15458996#comment-15458996
 ] 

Chen Zhang commented on SPARK-10925:


I have the same issue too. Very annoying. After I do several joins, this show 
up.

> Exception when joining DataFrames
> -
>
> Key: SPARK-10925
> URL: https://issues.apache.org/jira/browse/SPARK-10925
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.0, 1.5.1
> Environment: Tested with Spark 1.5.0 and Spark 1.5.1
>Reporter: Alexis Seigneurin
> Attachments: Photo 05-10-2015 14 31 16.jpg, TestCase2.scala
>
>
> I get an exception when joining a DataFrame with another DataFrame. The 
> second DataFrame was created by performing an aggregation on the first 
> DataFrame.
> My complete workflow is:
> # read the DataFrame
> # apply an UDF on column "name"
> # apply an UDF on column "surname"
> # apply an UDF on column "birthDate"
> # aggregate on "name" and re-join with the DF
> # aggregate on "surname" and re-join with the DF
> If I remove one step, the process completes normally.
> Here is the exception:
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved 
> attribute(s) surname#20 missing from id#0,birthDate#3,name#10,surname#7 in 
> operator !Project [id#0,birthDate#3,name#10,surname#20,UDF(birthDate#3) AS 
> birthDate_cleaned#8];
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:154)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:102)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:49)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
>   at 
> org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914)
>   at org.apache.spark.sql.DataFrame.(DataFrame.scala:132)
>   at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
>   at org.apache.spark.sql.DataFrame.join(DataFrame.scala:553)
>   at org.apache.spark.sql.DataFrame.join(DataFrame.scala:520)
>   at TestCase2$.main(TestCase2.scala:51)
>   at TestCase2.main(TestCase2.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at