[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2019-05-20 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-19299:
-
Labels: bulk-closed  (was: )

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>Priority: Major
>  Labels: bulk-closed
>
> The problem we're seeing is that if a null occurs in a non-nullable field and 
> is written down to parquet the resulting file gets corrupted and can not be 
> read back correctly.
> One way that this can occur is if a long value in python overflows the sql 
> LongType, this results in a null value inside the dataframe.
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value 
> in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. 
> repetition level: 0, definition level: 0
>   at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
>   at 
> 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-10-08 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-19299:
-
Priority: Major  (was: Critical)

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>
> The problem we're seeing is that if a null occurs in a non-nullable field and 
> is written down to parquet the resulting file gets corrupted and can not be 
> read back correctly.
> One way that this can occur is if a long value in python overflows the sql 
> LongType, this results in a null value inside the dataframe.
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value 
> in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. 
> repetition level: 0, definition level: 0
>   at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
>   at 
> 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Summary: Nulls in non nullable columns causes data corruption in parquet  
(was: Nulls in non nullable columns cause data corruption in parquet)

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>Priority: Critical
>
> The problem we're seeing is that if a null occurs in a non-nullable field and 
> is written down to parquet the resulting file gets corrupted and can not be 
> read back correctly.
> One way that this can occur is if a long value in python overflows the sql 
> LongType, this results in a null value inside the dataframe.
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value 
> in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. 
> repetition level: 0, definition level: 0
>   at 
> 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a non-nullable field and 
is written down to parquet the resulting file gets corrupted and can not be 
read back correctly.

One way that this can occur is if a long value in python overflows the sql 
LongType, this results in a null value inside the dataframe.

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64), # value overflows sql LongType
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader enabled this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with the vectorized reader disabled we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a no-nullable field and is 
written down to parquet the resulting file gets corrupted and can not be read 
back correctly.

One way that this can occur is when a long value in python is too big to fit 
into a sql LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64), # value overflows sql LongType
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader enabled this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with the vectorized reader disabled we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a non-nullable field and 
is written down to parquet the resulting file gets corrupted and can not be 
read back correctly.

One way that this can occur is when a long value in python is too big to fit 
into a sql LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64), # value overflows sql LongType
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader enabled this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with the vectorized reader disabled we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-20 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Priority: Critical  (was: Major)

> Nulls in non nullable columns causes data corruption in parquet
> ---
>
> Key: SPARK-19299
> URL: https://issues.apache.org/jira/browse/SPARK-19299
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 1.6.0, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>Reporter: Franklyn Dsouza
>Priority: Critical
>
> The problem we're seeing is that if a null occurs in a no-nullable field and 
> is written down to parquet the resulting file gets corrupt and can not be 
> read back correctly.
> One way that this can occur is when a long value in python is too big to fit 
> into a sql LongType it gets cast to null. 
> We're also seeing that the behaviour is different depending on whether or not 
> the vectorized reader is enabled.
> Here's an example in PySpark
> {code}
> from datetime import datetime
> from pyspark.sql import types
> data = [
>   (1, 6),
>   (2, 7),
>   (3, 2 ** 64), # value overflows sql LongType
>   (4, 8),
>   (5, 9)
> ]
> schema = types.StructType([
>   types.StructField("index", types.LongType(), False),
>   types.StructField("long", types.LongType(), False),
> ])
> df = sc.sql.createDataFrame(data, schema)
> df.collect()
> df.write.parquet("corrupt_parquet")
> df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
> df_parquet.collect()
> {code}
> with the vectorized reader enabled this causes
> {code}
> In [2]: df.collect()
> Out[2]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=None),
>  Row(index=4, long=8),
>  Row(index=5, long=9)]
> In [3]: df_parquet.collect()
> Out[3]:
> [Row(index=1, long=6),
>  Row(index=2, long=7),
>  Row(index=3, long=8),
>  Row(index=4, long=9),
>  Row(index=5, long=5)]
> {code}
> as you can see reading the data back from disk causes data to get shifted up 
> and between columns.
> with the vectorized reader disabled we are completely unable to read the file.
> {code}
> Py4JJavaError: An error occurred while calling o143.collectToPython.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not 
> read value at 4 in block 0 in file 
> file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>   at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>   at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>   at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value 
> in column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. 
> repetition level: 0, definition level: 0
>   at 
> org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
>   at 
> 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-19 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a no-nullable field and is 
written down to parquet the resulting file gets corrupt and can not be read 
back correctly.

One way that this can occur is when a long value in python is too big to fit 
into a sql LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64), # value overflows sql LongType
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader enabled this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with the vectorized reader disabled we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-19 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a no-nullable field and is 
written down to parquet the resulting file gets corrupt and can not be read 
back correctly.

One way that this can occur is when a long value in python is too big to fit 
into a spark LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64), # value overflows sql LongType
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader enabled this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with the vectorized reader disabled we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-19 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a no-nullable field and is 
written down to parquet the resulting file gets corrupt and can not be read 
back correctly.

One way that this can occur is when a long value in python is too big to fit 
into a spark LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64), # value overflows sql LongType
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader enabled this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with vectorized reader disabled we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-19 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a no-nullable field and is 
written down to parquet the resulting file gets corrupt and can not be read 
back correctly.

One way that this can occur is when a long value in python is too big to fit 
into a spark LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64), # value overflows sql LongType
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader on this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with vectorized reader off we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at 

[jira] [Updated] (SPARK-19299) Nulls in non nullable columns causes data corruption in parquet

2017-01-19 Thread Franklyn Dsouza (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franklyn Dsouza updated SPARK-19299:

Description: 
The problem we're seeing is that if a null occurs in a no-nullable field and is 
written down to parquet the resulting file gets corrupt and can not be read 
back correctly.

One way that this can occur is when a long value in python is too big to fit 
into a spark LongType it gets cast to null. 

We're also seeing that the behaviour is different depending on whether or not 
the vectorized reader is enabled.

Here's an example in PySpark

{code}
from datetime import datetime
from pyspark.sql import types

data = [
  (1, 6),
  (2, 7),
  (3, 2 ** 64),
  (4, 8),
  (5, 9)
]

schema = types.StructType([
  types.StructField("index", types.LongType(), False),
  types.StructField("long", types.LongType(), False),
])

df = sc.sql.createDataFrame(data, schema)

df.collect()

df.write.parquet("corrupt_parquet")

df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")

df_parquet.collect()
{code}

with the vectorized reader on this causes

{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=None),
 Row(index=4, long=8),
 Row(index=5, long=9)]

In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
 Row(index=2, long=7),
 Row(index=3, long=8),
 Row(index=4, long=9),
 Row(index=5, long=5)]
{code}

as you can see reading the data back from disk causes data to get shifted up 
and between columns.

with vectorized reader off we are completely unable to read the file.

{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read 
value at 4 in block 0 in file 
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-0-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in 
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition 
level: 0, definition level: 0
at 
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at 
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at