[jira] [Commented] (PARQUET-2170) Empty projection returns the wrong number of rows when column index is enabled

2022-08-04 Thread Ivan Sadikov (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575562#comment-17575562
 ] 

Ivan Sadikov commented on PARQUET-2170:
---

I will update the description later and I would like to open a PR to fix the 
issue. I think we just need to check if the column set is empty or not when 
checking paths in the ColumnIndexFilter but I will need to confirm this.

> Empty projection returns the wrong number of rows when column index is enabled
> --
>
> Key: PARQUET-2170
> URL: https://issues.apache.org/jira/browse/PARQUET-2170
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Reporter: Ivan Sadikov
>Priority: Major
>
> Discovered in Spark, when returning an empty projection from a Parquet file 
> with filter pushdown enabled (typically when doing filter + count), 
> Parquet-Mr returns a wrong number of rows with column index enabled. When the 
> column index feature is disabled, the result is correct.
>  
> This happens due to the following:
>  # ParquetFileReader::getFilteredRowCount() 
> ([https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L851)]
>  selects row ranges to calculate the row count when column index is enabled.
>  # In ColumnIndexFilter 
> ([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L80)]
>  we filter row ranges and pass the set of paths which in this case is empty.
>  # When evaluating the filter, if the column path is not in the set, we would 
> return an empty list of rows 
> ([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178)|https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178).]
>  which is always the case for an empty projection.
>  # This results in the incorrect number of records reported by the library.
> I will provide the full repro later.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (PARQUET-2170) Empty projection returns the wrong number of rows when column index is enabled

2022-08-04 Thread Ivan Sadikov (Jira)
Ivan Sadikov created PARQUET-2170:
-

 Summary: Empty projection returns the wrong number of rows when 
column index is enabled
 Key: PARQUET-2170
 URL: https://issues.apache.org/jira/browse/PARQUET-2170
 Project: Parquet
  Issue Type: Bug
  Components: parquet-mr
Reporter: Ivan Sadikov


Discovered in Spark, when returning an empty projection from a Parquet file 
with filter pushdown enabled (typically when doing filter + count), Parquet-Mr 
returns a wrong number of rows with column index enabled. When the column index 
feature is disabled, the result is correct.

 

This happens due to the following:
 # ParquetFileReader::getFilteredRowCount() 
([https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L851)]
 selects row ranges to calculate the row count when column index is enabled.
 # In ColumnIndexFilter 
([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L80)]
 we filter row ranges and pass the set of paths which in this case is empty.
 # When evaluating the filter, if the column path is not in the set, we would 
return an empty list of rows 
([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178)|https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178).]
 which is always the case for an empty projection.
 # This results in the incorrect number of records reported by the library.

I will provide the full repro later.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-09 Thread Ivan Sadikov (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903899#comment-16903899
 ] 

Ivan Sadikov commented on PARQUET-1632:
---

Actually, setting page.size configuration to smaller values results in 
PARQUET-1633, where one can't read the file one has written.

Yes, I see what you mean. arrayOutput overflowed, that resulted in size being 
negative, passed the check on page max size in writePage method and instead 
failed when running buf.collect.

> Negative initial size when writing large values in parquet-mr
> -
>
> Key: PARQUET-1632
> URL: https://issues.apache.org/jira/browse/PARQUET-1632
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.10.1
>Reporter: Ivan Sadikov
>Assignee: Junjie Chen
>Priority: Major
>
> I encountered an issue when writing large string values to Parquet.
> Here is the code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.functions._
> def longString: String = "a" * (64 * 1024 * 1024)
> val long_string = udf(() => longString)
> val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())
> spark.conf.set("parquet.enable.dictionary", "false")
> df.write.option("compression", 
> "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
>  
> This Spark job fails with the exception:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
> org.apache.spark.SparkException: Task failed while writing rows. at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.IllegalArgumentException: Negative initial size: 
> -1610612543 at 
> java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
> org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
>  at 
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) 
> at 
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>  at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
>  at 
> 

[jira] [Comment Edited] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-08 Thread Ivan Sadikov (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903092#comment-16903092
 ] 

Ivan Sadikov edited comment on PARQUET-1632 at 8/8/19 3:57 PM:
---

Left a comment already. It looks like there is an issue of treating BytesInput 
as a byte array when it should be treated as stream of bytes. Also bytes 
collector should gracefully handle adding large BytesInput instances. 

 

The recommended configuration setting is the current mitigation, not a solution 
to the problem. Plus, the config is about tweaking page size, not column chunk 
size (see the comment above).


was (Author: sadikovi):
Left a comment already. It looks like there is an issue of treating BytesInput 
as a byte array when it should be treated as stream of bytes. Also bytes 
collector should gracefully handle adding large BytesInput instances. 

 

The recommended configuration setting is the current mitigation, not a solution 
to the problem. Plus, it is about page size, not column chunk size (see the 
comment above).

> Negative initial size when writing large values in parquet-mr
> -
>
> Key: PARQUET-1632
> URL: https://issues.apache.org/jira/browse/PARQUET-1632
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.10.1
>Reporter: Ivan Sadikov
>Assignee: Junjie Chen
>Priority: Major
>
> I encountered an issue when writing large string values to Parquet.
> Here is the code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.functions._
> def longString: String = "a" * (64 * 1024 * 1024)
> val long_string = udf(() => longString)
> val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())
> spark.conf.set("parquet.enable.dictionary", "false")
> df.write.option("compression", 
> "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
>  
> This Spark job fails with the exception:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
> org.apache.spark.SparkException: Task failed while writing rows. at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.IllegalArgumentException: Negative initial size: 
> -1610612543 at 
> java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
> org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
>  at 
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) 
> at 
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>  at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
>  at 
> 

[jira] [Reopened] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-08 Thread Ivan Sadikov (JIRA)


 [ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Sadikov reopened PARQUET-1632:
---

Left a comment already. It looks like there is an issue of treating BytesInput 
as a byte array when it should be treated as stream of bytes. Also bytes 
collector should gracefully handle adding large BytesInput instances. 

 

The recommended configuration setting is the current mitigation, not a solution 
to the problem. Plus, it is about page size, not column chunk size (see the 
comment above).

> Negative initial size when writing large values in parquet-mr
> -
>
> Key: PARQUET-1632
> URL: https://issues.apache.org/jira/browse/PARQUET-1632
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.10.1
>Reporter: Ivan Sadikov
>Assignee: Junjie Chen
>Priority: Major
>
> I encountered an issue when writing large string values to Parquet.
> Here is the code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.functions._
> def longString: String = "a" * (64 * 1024 * 1024)
> val long_string = udf(() => longString)
> val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())
> spark.conf.set("parquet.enable.dictionary", "false")
> df.write.option("compression", 
> "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
>  
> This Spark job fails with the exception:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
> org.apache.spark.SparkException: Task failed while writing rows. at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.IllegalArgumentException: Negative initial size: 
> -1610612543 at 
> java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
> org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
>  at 
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) 
> at 
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>  at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
> 

[jira] [Comment Edited] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-08 Thread Ivan Sadikov (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903082#comment-16903082
 ] 

Ivan Sadikov edited comment on PARQUET-1632 at 8/8/19 3:43 PM:
---

I don't think your assessment is correct. Yes, it overflows when we cast size() 
to integer, even though the size can be long. 

It looks like the problem is ConcatenatingByteArrayCollector and toByteArray 
method. ConcatenatingByteArrayCollector class should handle arbitrary 
BytesInput by checking its size first and splitting the data across slabs (in 
this particular case even when the total size is larger than i32 max value) - 
I'd say that is the whole point of concatenating byte array in the first place.

Note that BytesInput instance itself did not fail with overflow. It only failed 
when turning it to a byte array. If what you mentioned in the comment was true, 
I would expect failure when creating an instance of BytesInput.

Collector should be filling up the last slab and allocating those as it 
requests more and more data from BytesInput, similar to output stream. 
BytesInput is treated like stream based on comments in the code, so it is a 
bug, IMHO.

 

Regarding your comment around page size check - it is merely a mitigation 
strategy that makes it pass the error. It is not a fix. I already included it 
in PARQUET-1633.

My understanding is parquet.page.size.row.check.min affects the page size 
itself. If page size was larger than 2GB (which, I would expect the code to 
fail here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116]

Or here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123]

Those conditions explicitly check for the page size. But the code did not fail 
there.


was (Author: sadikovi):
I don't think your assessment is correct. Yes, it overflows when we cast size() 
to integer, even though the size can be long. 

It looks like the problem is ConcatenatingByteArrayCollector and toByteArray 
method. ConcatenatingByteArrayCollector class should handle arbitrary 
BytesInput by checking its size first and splitting the data across slabs (in 
this particular case even when the total size is larger than i32 max value) - 
I'd say that is the whole point of concatenating byte array in the first place.

Note that BytesInput instance itself did not fail with overflow. It only failed 
when turning it to a byte array. If what you mentioned in the comment was true, 
I would expect failure when creating an instance of BytesInput.

Collector should be filling up the last slab and allocating those as it 
requests more and more data from BytesInput, similar to output stream.

 

Regarding your comment around page size check - it is merely a mitigation 
strategy that makes it pass the error. It is not a fix. I already included it 
in PARQUET-1633.

My understanding is parquet.page.size.row.check.min affects the page size 
itself. If page size was larger than 2GB (which, I would expect the code to 
fail here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116]

Or here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123]

Those conditions explicitly check for the page size. But the code did not fail 
there.

> Negative initial size when writing large values in parquet-mr
> -
>
> Key: PARQUET-1632
> URL: https://issues.apache.org/jira/browse/PARQUET-1632
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.10.1
>Reporter: Ivan Sadikov
>Assignee: Junjie Chen
>Priority: Major
>
> I encountered an issue when writing large string values to Parquet.
> Here is the code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.functions._
> def longString: String = "a" * (64 * 1024 * 1024)
> val long_string = udf(() => longString)
> val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())
> spark.conf.set("parquet.enable.dictionary", "false")
> df.write.option("compression", 
> "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
>  
> This Spark job fails with the exception:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
> org.apache.spark.SparkException: Task failed while writing rows. at 
> 

[jira] [Commented] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-08 Thread Ivan Sadikov (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903082#comment-16903082
 ] 

Ivan Sadikov commented on PARQUET-1632:
---

I don't think your assessment is correct. Yes, it overflows when we cast size() 
to integer, even though the size can be long. 

It looks like the problem is ConcatenatingByteArrayCollector and toByteArray 
method. ConcatenatingByteArrayCollector class should handle arbitrary 
BytesInput by checking its size first and splitting the data across slabs (in 
this particular case even when the total size is larger than i32 max value) - 
I'd say that is the whole point of concatenating byte array in the first place.

Note that BytesInput instance itself did not fail with overflow. It only failed 
when turning it to a byte array. If what you mentioned in the comment was true, 
I would expect failure when creating an instance of BytesInput.

Collector should be filling up the last slab and allocating those as it 
requests more and more data from BytesInput, similar to output stream.

 

Regarding your comment around page size check - it is merely a mitigation 
strategy that makes it pass the error. It is not a fix. I already included it 
in PARQUET-1633.

My understanding is parquet.page.size.row.check.min affects the page size 
itself. If page size was larger than 2GB (which, I would expect the code to 
fail here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116]

Or here:

[https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123]

Those conditions explicitly check for the page size. But the code did not fail 
there.

> Negative initial size when writing large values in parquet-mr
> -
>
> Key: PARQUET-1632
> URL: https://issues.apache.org/jira/browse/PARQUET-1632
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.10.1
>Reporter: Ivan Sadikov
>Assignee: Junjie Chen
>Priority: Major
>
> I encountered an issue when writing large string values to Parquet.
> Here is the code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.functions._
> def longString: String = "a" * (64 * 1024 * 1024)
> val long_string = udf(() => longString)
> val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())
> spark.conf.set("parquet.enable.dictionary", "false")
> df.write.option("compression", 
> "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
>  
> This Spark job fails with the exception:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
> org.apache.spark.SparkException: Task failed while writing rows. at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 
> Caused by: java.lang.IllegalArgumentException: Negative initial size: 
> -1610612543 at 
> java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
> org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
>  at 
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>  at 
> 

[jira] [Updated] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-08 Thread Ivan Sadikov (JIRA)


 [ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Sadikov updated PARQUET-1632:
--
Description: 
I encountered an issue when writing large string values to Parquet.

Here is the code to reproduce the issue:
{code:java}
import org.apache.spark.sql.functions._

def longString: String = "a" * (64 * 1024 * 1024)
val long_string = udf(() => longString)

val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())

spark.conf.set("parquet.enable.dictionary", "false")
df.write.option("compression", 
"uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
 

This Spark job fails with the exception:
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
org.apache.spark.SparkException: Task failed while writing rows. at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.IllegalArgumentException: Negative initial size: 
-1610612543 at 
java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
 at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
 ... 11 more{code}
 

Would appreciate if you could help addressing the problem. Thanks!

  was:
I encountered an issue when writing large string values to Parquet.

Here is the code to reproduce the issue:
{code:java}
import org.apache.spark.sql.functions._

def longString: String = "a" * (64 * 1024 * 1024)
val long_string = udf(() => longString)

val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())

spark.conf.set("parquet.enable.dictionary", "false")
df.write.option("compression", 
"uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
 

This Spark job fails with the exception:
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
stage 10861.0 (TID 

[jira] [Commented] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-07 Thread Ivan Sadikov (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16902098#comment-16902098
 ] 

Ivan Sadikov commented on PARQUET-1632:
---

Thank you.

> Negative initial size when writing large values in parquet-mr
> -
>
> Key: PARQUET-1632
> URL: https://issues.apache.org/jira/browse/PARQUET-1632
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.10.1
>Reporter: Ivan Sadikov
>Assignee: Junjie Chen
>Priority: Major
>
> I encountered an issue when writing large string values to Parquet.
> Here is the code to reproduce the issue:
> {code:java}
> import org.apache.spark.sql.functions._
> def longString: String = "a" * (64 * 1024 * 1024)
> val long_string = udf(() => longString)
> val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())
> spark.conf.set("parquet.enable.dictionary", "false")
> df.write.option("compression", 
> "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
>  
> This Spark job fails with the exception:
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
> stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
> org.apache.spark.SparkException: Task failed while writing rows. at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
> org.apache.spark.scheduler.Task.run(Task.scala:112) at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) Caused by: 
> java.lang.IllegalArgumentException: Negative initial size: -1610612543 at 
> java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
> org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
> org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
> org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
>  at 
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
>  at 
> org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) 
> at 
> org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
>  at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
>  ... 11 more{code}
>  
> Would appreciate if you could help addressing the problem. Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (PARQUET-1633) Integer overflow in ParquetFileReader.ConsecutiveChunkList

2019-08-07 Thread Ivan Sadikov (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16902096#comment-16902096
 ] 

Ivan Sadikov commented on PARQUET-1633:
---

I would say it is a bug. There are a few instances of overflow I found in 
parquet-mr, another example being 
https://issues.apache.org/jira/browse/PARQUET-1632.

I also have a simple repro with Spark; by the way, if you make records shorter, 
the problem still persists:
{code:java}
import org.apache.spark.sql.functions._

val large_str = udf(() => "a" * (128 * 1024 * 1024))

val df = spark.range(0, 20, 1, 1).withColumn("large_str", large_str())

spark.conf.set("parquet.enable.dictionary", "false")
spark.conf.set("parquet.page.size.row.check.min", "1") // this is done so I 
don't hit PARQUET-1632
df.write.option("compression", 
"uncompressed").mode("overwrite").parquet("/mnt/large.parquet")

spark.read.parquet("/mnt/large.parquet").foreach(_ => Unit) // Fails{code}
Here is the stacktrace:
{code:java}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in 
stage 22.0 failed 4 times, most recent failure: Lost task 10.3 in stage 22.0 
(TID 121, 10.0.217.97, executor 1): java.lang.IllegalArgumentException: Illegal 
Capacity: -191
at java.util.ArrayList.(ArrayList.java:157) at 
org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1169)
 at 
org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:811){code}
 

Well, there is no restriction on row group size in parquet format. Also, it is 
not a significant effort to patch this issue - making ChunkDescriptor size 
field as well as ConsecutiveChunkList length to have long type should patch 
this problem (it already has long type in metadata).

> Integer overflow in ParquetFileReader.ConsecutiveChunkList
> --
>
> Key: PARQUET-1633
> URL: https://issues.apache.org/jira/browse/PARQUET-1633
> Project: Parquet
>  Issue Type: Bug
>  Components: parquet-mr
>Affects Versions: 1.10.1
>Reporter: Ivan Sadikov
>Priority: Major
>
> When reading a large Parquet file (2.8GB), I encounter the following 
> exception:
> {code:java}
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
> dbfs:/user/hive/warehouse/demo.db/test_table/part-00014-tid-1888470069989036737-593c82a4-528b-4975-8de0-5bcbc5e9827d-10856-1-c000.snappy.parquet
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
> at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
> at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40)
> at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:228)
> ... 14 more
> Caused by: java.lang.IllegalArgumentException: Illegal Capacity: -212
> at java.util.ArrayList.(ArrayList.java:157)
> at 
> org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1169){code}
>  
> The file metadata is:
>  * block 1 (3 columns)
>  ** rowCount: 110,100
>  ** totalByteSize: 348,492,072
>  ** compressedSize: 165,689,649
>  * block 2 (3 columns)
>  ** rowCount: 90,054
>  ** totalByteSize: 3,243,165,541
>  ** compressedSize: 2,509,579,966
>  * block 3 (3 columns)
>  ** rowCount: 105,119
>  ** totalByteSize: 350,901,693
>  ** compressedSize: 144,952,177
>  * block 4 (3 columns)
>  ** rowCount: 48,741
>  ** totalByteSize: 1,275,995
>  ** compressedSize: 914,205
> I don't have the code to reproduce the issue, unfortunately; however, I 
> looked at the code and it seems that integer {{length}} field in 
> ConsecutiveChunkList overflows, which results in negative capacity for array 
> list in {{readAll}} method:
> {code:java}
> int fullAllocations = length / options.getMaxAllocationSize();
> int lastAllocationSize = length % options.getMaxAllocationSize();
>   
> int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
> List buffers = new ArrayList<>(numAllocations);{code}
>  
> This is caused by cast to integer in {{readNextRowGroup}} method in 
> ParquetFileReader:
> {code:java}
> currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, 
> (int)mc.getTotalSize()));
> {code}
> which overflows when total size of the column is larger than 
> Integer.MAX_VALUE.
> I would appreciate if you could help addressing the issue. Thanks!
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-06 Thread Ivan Sadikov (JIRA)


 [ 
https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Sadikov updated PARQUET-1632:
--
Description: 
I encountered an issue when writing large string values to Parquet.

Here is the code to reproduce the issue:
{code:java}
import org.apache.spark.sql.functions._

def longString: String = "a" * (64 * 1024 * 1024)
val long_string = udf(() => longString)

val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())

spark.conf.set("parquet.enable.dictionary", "false")
df.write.option("compression", 
"uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
 

This Spark job fails with the exception:
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
org.apache.spark.SparkException: Task failed while writing rows. at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.IllegalArgumentException: Negative initial size: -1610612543 at 
java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
 at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
 ... 11 more{code}
 

Would appreciate if you could help addressing the problem. Thanks!

  was:
I encountered an issue when writing large string values to Parquet.

Here is the code to reproduce the issue:
{code:java}
import org.apache.spark.sql.functions._

def longString: String = "a" * (64 * 1024 * 1024)
val long_string = udf(() => longString)

val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())

spark.conf.set("parquet.enable.dictionary", "false")
df.write.option("compression", 
"uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
 

This Spark job fails with the exception:
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
stage 10861.0 (TID 

[jira] [Created] (PARQUET-1633) Integer overflow in ParquetFileReader.ConsecutiveChunkList

2019-08-06 Thread Ivan Sadikov (JIRA)
Ivan Sadikov created PARQUET-1633:
-

 Summary: Integer overflow in ParquetFileReader.ConsecutiveChunkList
 Key: PARQUET-1633
 URL: https://issues.apache.org/jira/browse/PARQUET-1633
 Project: Parquet
  Issue Type: Bug
  Components: parquet-mr
Affects Versions: 1.10.1
Reporter: Ivan Sadikov


When reading a large Parquet file (2.8GB), I encounter the following exception:
{code:java}
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
dbfs:/user/hive/warehouse/demo.db/test_table/part-00014-tid-1888470069989036737-593c82a4-528b-4975-8de0-5bcbc5e9827d-10856-1-c000.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:228)
... 14 more
Caused by: java.lang.IllegalArgumentException: Illegal Capacity: -212
at java.util.ArrayList.(ArrayList.java:157)
at 
org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1169){code}
 

The file metadata is:
 * block 1 (3 columns)

 ** rowCount: 110,100

 ** totalByteSize: 348,492,072

 ** compressedSize: 165,689,649

 * block 2 (3 columns)

 ** rowCount: 90,054

 ** totalByteSize: 3,243,165,541

 ** compressedSize: 2,509,579,966

 * block 3 (3 columns)

 ** rowCount: 105,119

 ** totalByteSize: 350,901,693

 ** compressedSize: 144,952,177

 * block 4 (3 columns)

 ** rowCount: 48,741

 ** totalByteSize: 1,275,995

 ** compressedSize: 914,205

I don't have the code to reproduce the issue, unfortunately; however, I looked 
at the code and it seems that integer {{length}} field in ConsecutiveChunkList 
overflows, which results in negative capacity for array list in {{readAll}} 
method:
{code:java}
int fullAllocations = length / options.getMaxAllocationSize();
int lastAllocationSize = length % options.getMaxAllocationSize();

int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List buffers = new ArrayList<>(numAllocations);{code}
 

This is caused by cast to integer in {{readNextRowGroup}} method in 
ParquetFileReader:
{code:java}
currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, 
(int)mc.getTotalSize()));
{code}
which overflows when total size of the column is larger than Integer.MAX_VALUE.

I would appreciate if you could help addressing the issue. Thanks!

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (PARQUET-1632) Negative initial size when writing large values in parquet-mr

2019-08-06 Thread Ivan Sadikov (JIRA)
Ivan Sadikov created PARQUET-1632:
-

 Summary: Negative initial size when writing large values in 
parquet-mr
 Key: PARQUET-1632
 URL: https://issues.apache.org/jira/browse/PARQUET-1632
 Project: Parquet
  Issue Type: Bug
  Components: parquet-mr
Affects Versions: 1.10.1
Reporter: Ivan Sadikov


I encountered an issue when writing large string values to Parquet.

Here is the code to reproduce the issue:
{code:java}
import org.apache.spark.sql.functions._

def longString: String = "a" * (64 * 1024 * 1024)
val long_string = udf(() => longString)

val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string())

spark.conf.set("parquet.enable.dictionary", "false")
df.write.option("compression", 
"uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code}
 

This Spark job fails with the exception:
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in 
stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): 
org.apache.spark.SparkException: Task failed while writing rows. at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at 
org.apache.spark.scheduler.Task.run(Task.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.IllegalArgumentException: Negative initial size: -1610612543 at 
java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at 
org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at 
org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at 
org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at 
org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
 at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
 at 
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) at 
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
 at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
 at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
 at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
 ... 11 more{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (PARQUET-458) [C++] Implement support for DataPageV2

2019-07-02 Thread Ivan Sadikov (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876738#comment-16876738
 ] 

Ivan Sadikov commented on PARQUET-458:
--

Hello,

If it helps, Rust parquet library in Apache Arrow already implements Data page 
V2 for both writes and reads, so you can use that as a reference when working 
on C++ code and check correctness in either Rust or C++.

Here are some links for column reader/writer:

https://github.com/apache/arrow/blob/master/rust/parquet/src/column/reader.rs#L342
https://github.com/apache/arrow/blob/master/rust/parquet/src/column/writer.rs#L544

We also added a bunch of Data page v2 encodings added in Rust, and quite a few 
correctness tests to check encoding/decoding works.

Hopefully, it helps, [~wesmckinn].

> [C++] Implement support for DataPageV2
> --
>
> Key: PARQUET-458
> URL: https://issues.apache.org/jira/browse/PARQUET-458
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-cpp
>Reporter: Wes McKinney
>Assignee: Wes McKinney
>Priority: Minor
> Fix For: cpp-1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (PARQUET-750) Parquet tools cat/head parity, small fixes

2016-10-14 Thread Ivan Sadikov (JIRA)

[ 
https://issues.apache.org/jira/browse/PARQUET-750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15576394#comment-15576394
 ] 

Ivan Sadikov commented on PARQUET-750:
--

PR: https://github.com/apache/parquet-mr/pull/378

> Parquet tools cat/head parity, small fixes
> --
>
> Key: PARQUET-750
> URL: https://issues.apache.org/jira/browse/PARQUET-750
> Project: Parquet
>  Issue Type: Bug
>Reporter: Ivan Sadikov
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (PARQUET-750) Parquet tools cat/head parity, small fixes

2016-10-14 Thread Ivan Sadikov (JIRA)
Ivan Sadikov created PARQUET-750:


 Summary: Parquet tools cat/head parity, small fixes
 Key: PARQUET-750
 URL: https://issues.apache.org/jira/browse/PARQUET-750
 Project: Parquet
  Issue Type: Bug
Reporter: Ivan Sadikov
Priority: Trivial






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (PARQUET-748) Update assembly for parquet tools

2016-10-14 Thread Ivan Sadikov (JIRA)

 [ 
https://issues.apache.org/jira/browse/PARQUET-748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Sadikov resolved PARQUET-748.
--
Resolution: Won't Fix

> Update assembly for parquet tools
> -
>
> Key: PARQUET-748
> URL: https://issues.apache.org/jira/browse/PARQUET-748
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Ivan Sadikov
>Priority: Trivial
>
> Small update for {{parquet-tools}} module to be able to create distribution 
> including shell scripts and all dependencies. Also documentation update.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (PARQUET-748) Update assembly for parquet tools

2016-10-11 Thread Ivan Sadikov (JIRA)
Ivan Sadikov created PARQUET-748:


 Summary: Update assembly for parquet tools
 Key: PARQUET-748
 URL: https://issues.apache.org/jira/browse/PARQUET-748
 Project: Parquet
  Issue Type: Improvement
Reporter: Ivan Sadikov
Priority: Trivial


Small update for {{parquet-tools}} module to be able to create distribution 
including shell scripts and all dependencies. Also documentation update.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (PARQUET-748) Update assembly for parquet tools

2016-10-11 Thread Ivan Sadikov (JIRA)

[ 
https://issues.apache.org/jira/browse/PARQUET-748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15566544#comment-15566544
 ] 

Ivan Sadikov commented on PARQUET-748:
--

I will work on this, it is fairly trivial and will get myself familiarized with 
code base a little bit.

> Update assembly for parquet tools
> -
>
> Key: PARQUET-748
> URL: https://issues.apache.org/jira/browse/PARQUET-748
> Project: Parquet
>  Issue Type: Improvement
>Reporter: Ivan Sadikov
>Priority: Trivial
>
> Small update for {{parquet-tools}} module to be able to create distribution 
> including shell scripts and all dependencies. Also documentation update.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)