[ 
https://issues.apache.org/jira/browse/SPARK-34167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17269182#comment-17269182
 ] 

Attila Zsolt Piros edited comment on SPARK-34167 at 1/21/21, 9:49 AM:
----------------------------------------------------------------------

[~razajafri] could you please share with us how the parquet files are created?

I tried to reproduce this issue in the following way but I had no luck:

{noformat}
Spark context Web UI available at http://192.168.0.17:4045
Spark context available as 'sc' (master = local, app id = local-1611221568779).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import java.math.BigDecimal
import java.math.BigDecimal

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.types.{DecimalType, StructField, StructType}
import org.apache.spark.sql.types.{DecimalType, StructField, StructType}

scala> val schema = StructType(Array(StructField("num", DecimalType(8,2),true)))
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(num,DecimalType(8,2),true))

scala> val rdd = sc.parallelize((0 to 9).map(v => new 
BigDecimal(s"123456.7$v")))
rdd: org.apache.spark.rdd.RDD[java.math.BigDecimal] = ParallelCollectionRDD[0] 
at parallelize at <c
onsole>:27

scala> val df = spark.createDataFrame(rdd.map(Row(_)), schema)
df: org.apache.spark.sql.DataFrame = [num: decimal(8,2)]

scala> df.show()
+---------+
|      num|
+---------+
|123456.70|
|123456.71|
|123456.72|
|123456.73|
|123456.74|
|123456.75|
|123456.76|
|123456.77|
|123456.78|
|123456.79|
+---------+


scala> df.write.parquet("num.parquet")

scala> spark.read.parquet("num.parquet").show()
+---------+
|      num|
+---------+
|123456.70|
|123456.71|
|123456.72|
|123456.73|
|123456.74|
|123456.75|
|123456.76|
|123456.77|
|123456.78|
|123456.79|
+---------+

{noformat}




was (Author: attilapiros):
[~razajafri] could you share with us how the parquet files are created?

I tried to reproduce this issue in the following way but I had no luck:

{noformat}
Spark context Web UI available at http://192.168.0.17:4045
Spark context available as 'sc' (master = local, app id = local-1611221568779).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import java.math.BigDecimal
import java.math.BigDecimal

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> import org.apache.spark.sql.types.{DecimalType, StructField, StructType}
import org.apache.spark.sql.types.{DecimalType, StructField, StructType}

scala> val schema = StructType(Array(StructField("num", DecimalType(8,2),true)))
schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(num,DecimalType(8,2),true))

scala> val rdd = sc.parallelize((0 to 9).map(v => new 
BigDecimal(s"123456.7$v")))
rdd: org.apache.spark.rdd.RDD[java.math.BigDecimal] = ParallelCollectionRDD[0] 
at parallelize at <c
onsole>:27

scala> val df = spark.createDataFrame(rdd.map(Row(_)), schema)
df: org.apache.spark.sql.DataFrame = [num: decimal(8,2)]

scala> df.show()
+---------+
|      num|
+---------+
|123456.70|
|123456.71|
|123456.72|
|123456.73|
|123456.74|
|123456.75|
|123456.76|
|123456.77|
|123456.78|
|123456.79|
+---------+


scala> df.write.parquet("num.parquet")

scala> spark.read.parquet("num.parquet").show()
+---------+
|      num|
+---------+
|123456.70|
|123456.71|
|123456.72|
|123456.73|
|123456.74|
|123456.75|
|123456.76|
|123456.77|
|123456.78|
|123456.79|
+---------+

{noformat}



> Reading parquet with Decimal(8,2) written as a Decimal64 blows up
> -----------------------------------------------------------------
>
>                 Key: SPARK-34167
>                 URL: https://issues.apache.org/jira/browse/SPARK-34167
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 3.0.1
>            Reporter: Raza Jafri
>            Priority: Major
>         Attachments: 
> part-00000-7fecd321-b247-4f7e-bff5-c2e4d8facaa0-c000.snappy.parquet, 
> part-00000-940f44f1-f323-4a5e-b828-1e65d87895aa-c000.snappy.parquet
>
>
> When reading a parquet file written with Decimals with precision < 10 as a 
> 64-bit representation, Spark tries to read it as an INT and fails
>  
> Steps to reproduce:
> Read the attached file that has a single Decimal(8,2) column with 10 values
> {code:java}
> scala> spark.read.parquet("/tmp/pyspark_tests/936454/PARQUET_DATA").show
> ...
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:327)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readLongs(VectorizedRleValuesReader.java:370)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:514)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:256)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:273)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:171)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
>   at 
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:497)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
>   at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
>   at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
>   at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>   at org.apache.spark.scheduler.Task.run(Task.scala:127)
>   at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:480)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1426)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:483)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> ...
> {code}
>  
>  
> Here are my findings. The *{{VectorizedParquetRecordReader}}* reads in the 
> parquet file correctly because its basing the read on the 
> [requestedSchema|https://github.com/apache/spark/blob/e6f019836c099398542b443f7700f79de81da0d5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java#L150]
>  which is a MessageType and has the underlying data stored correctly as 
> {{INT64}} where as the *{{OnHeapColumnVector}}* is initialized based on the 
> [batchSchema|https://github.com/apache/spark/blob/e6f019836c099398542b443f7700f79de81da0d5/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java#L151]
>  which is coming from {{org.apache.spark.sql.parquet.row.requested_schema}} 
> that is set by the reader which is a {{StructType}} and only has 
> {{Decimal(_,_)}} in it.
> [https://github.com/apache/spark/blob/a44e008de3ae5aecad9e0f1a7af6a1e8b0d97f4e/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L224]
>  
> Attached are two files, one with Decimal(8,2) ther other with Decimal(1,1) 
> both written as Decimal backed by INT64. Decimal(1,1) results in a different 
> exception but same for the same reason
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to