[jira] [Commented] (PARQUET-2170) Empty projection returns the wrong number of rows when column index is enabled
[ https://issues.apache.org/jira/browse/PARQUET-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17575562#comment-17575562 ] Ivan Sadikov commented on PARQUET-2170: --- I will update the description later and I would like to open a PR to fix the issue. I think we just need to check if the column set is empty or not when checking paths in the ColumnIndexFilter but I will need to confirm this. > Empty projection returns the wrong number of rows when column index is enabled > -- > > Key: PARQUET-2170 > URL: https://issues.apache.org/jira/browse/PARQUET-2170 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Reporter: Ivan Sadikov >Priority: Major > > Discovered in Spark, when returning an empty projection from a Parquet file > with filter pushdown enabled (typically when doing filter + count), > Parquet-Mr returns a wrong number of rows with column index enabled. When the > column index feature is disabled, the result is correct. > > This happens due to the following: > # ParquetFileReader::getFilteredRowCount() > ([https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L851)] > selects row ranges to calculate the row count when column index is enabled. > # In ColumnIndexFilter > ([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L80)] > we filter row ranges and pass the set of paths which in this case is empty. > # When evaluating the filter, if the column path is not in the set, we would > return an empty list of rows > ([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178)|https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178).] > which is always the case for an empty projection. > # This results in the incorrect number of records reported by the library. > I will provide the full repro later. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (PARQUET-2170) Empty projection returns the wrong number of rows when column index is enabled
Ivan Sadikov created PARQUET-2170: - Summary: Empty projection returns the wrong number of rows when column index is enabled Key: PARQUET-2170 URL: https://issues.apache.org/jira/browse/PARQUET-2170 Project: Parquet Issue Type: Bug Components: parquet-mr Reporter: Ivan Sadikov Discovered in Spark, when returning an empty projection from a Parquet file with filter pushdown enabled (typically when doing filter + count), Parquet-Mr returns a wrong number of rows with column index enabled. When the column index feature is disabled, the result is correct. This happens due to the following: # ParquetFileReader::getFilteredRowCount() ([https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L851)] selects row ranges to calculate the row count when column index is enabled. # In ColumnIndexFilter ([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L80)] we filter row ranges and pass the set of paths which in this case is empty. # When evaluating the filter, if the column path is not in the set, we would return an empty list of rows ([https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178)|https://github.com/apache/parquet-mr/blob/0819356a9dafd2ca07c5eab68e2bffeddc3bd3d9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L178).] which is always the case for an empty projection. # This results in the incorrect number of records reported by the library. I will provide the full repro later. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903899#comment-16903899 ] Ivan Sadikov commented on PARQUET-1632: --- Actually, setting page.size configuration to smaller values results in PARQUET-1633, where one can't read the file one has written. Yes, I see what you mean. arrayOutput overflowed, that resulted in size being negative, passed the check on page max size in writePage method and instead failed when running buf.collect. > Negative initial size when writing large values in parquet-mr > - > > Key: PARQUET-1632 > URL: https://issues.apache.org/jira/browse/PARQUET-1632 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.10.1 >Reporter: Ivan Sadikov >Assignee: Junjie Chen >Priority: Major > > I encountered an issue when writing large string values to Parquet. > Here is the code to reproduce the issue: > {code:java} > import org.apache.spark.sql.functions._ > def longString: String = "a" * (64 * 1024 * 1024) > val long_string = udf(() => longString) > val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) > spark.conf.set("parquet.enable.dictionary", "false") > df.write.option("compression", > "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} > > This Spark job fails with the exception: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): > org.apache.spark.SparkException: Task failed while writing rows. at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: Negative initial size: > -1610612543 at > java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at > org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at > org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) > at > org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560) > at >
[jira] [Comment Edited] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903092#comment-16903092 ] Ivan Sadikov edited comment on PARQUET-1632 at 8/8/19 3:57 PM: --- Left a comment already. It looks like there is an issue of treating BytesInput as a byte array when it should be treated as stream of bytes. Also bytes collector should gracefully handle adding large BytesInput instances. The recommended configuration setting is the current mitigation, not a solution to the problem. Plus, the config is about tweaking page size, not column chunk size (see the comment above). was (Author: sadikovi): Left a comment already. It looks like there is an issue of treating BytesInput as a byte array when it should be treated as stream of bytes. Also bytes collector should gracefully handle adding large BytesInput instances. The recommended configuration setting is the current mitigation, not a solution to the problem. Plus, it is about page size, not column chunk size (see the comment above). > Negative initial size when writing large values in parquet-mr > - > > Key: PARQUET-1632 > URL: https://issues.apache.org/jira/browse/PARQUET-1632 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.10.1 >Reporter: Ivan Sadikov >Assignee: Junjie Chen >Priority: Major > > I encountered an issue when writing large string values to Parquet. > Here is the code to reproduce the issue: > {code:java} > import org.apache.spark.sql.functions._ > def longString: String = "a" * (64 * 1024 * 1024) > val long_string = udf(() => longString) > val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) > spark.conf.set("parquet.enable.dictionary", "false") > df.write.option("compression", > "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} > > This Spark job fails with the exception: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): > org.apache.spark.SparkException: Task failed while writing rows. at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: Negative initial size: > -1610612543 at > java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at > org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at > org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) > at > org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) > at >
[jira] [Reopened] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Sadikov reopened PARQUET-1632: --- Left a comment already. It looks like there is an issue of treating BytesInput as a byte array when it should be treated as stream of bytes. Also bytes collector should gracefully handle adding large BytesInput instances. The recommended configuration setting is the current mitigation, not a solution to the problem. Plus, it is about page size, not column chunk size (see the comment above). > Negative initial size when writing large values in parquet-mr > - > > Key: PARQUET-1632 > URL: https://issues.apache.org/jira/browse/PARQUET-1632 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.10.1 >Reporter: Ivan Sadikov >Assignee: Junjie Chen >Priority: Major > > I encountered an issue when writing large string values to Parquet. > Here is the code to reproduce the issue: > {code:java} > import org.apache.spark.sql.functions._ > def longString: String = "a" * (64 * 1024 * 1024) > val long_string = udf(() => longString) > val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) > spark.conf.set("parquet.enable.dictionary", "false") > df.write.option("compression", > "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} > > This Spark job fails with the exception: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): > org.apache.spark.SparkException: Task failed while writing rows. at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: Negative initial size: > -1610612543 at > java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at > org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at > org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) > at > org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560) >
[jira] [Comment Edited] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903082#comment-16903082 ] Ivan Sadikov edited comment on PARQUET-1632 at 8/8/19 3:43 PM: --- I don't think your assessment is correct. Yes, it overflows when we cast size() to integer, even though the size can be long. It looks like the problem is ConcatenatingByteArrayCollector and toByteArray method. ConcatenatingByteArrayCollector class should handle arbitrary BytesInput by checking its size first and splitting the data across slabs (in this particular case even when the total size is larger than i32 max value) - I'd say that is the whole point of concatenating byte array in the first place. Note that BytesInput instance itself did not fail with overflow. It only failed when turning it to a byte array. If what you mentioned in the comment was true, I would expect failure when creating an instance of BytesInput. Collector should be filling up the last slab and allocating those as it requests more and more data from BytesInput, similar to output stream. BytesInput is treated like stream based on comments in the code, so it is a bug, IMHO. Regarding your comment around page size check - it is merely a mitigation strategy that makes it pass the error. It is not a fix. I already included it in PARQUET-1633. My understanding is parquet.page.size.row.check.min affects the page size itself. If page size was larger than 2GB (which, I would expect the code to fail here: [https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116] Or here: [https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123] Those conditions explicitly check for the page size. But the code did not fail there. was (Author: sadikovi): I don't think your assessment is correct. Yes, it overflows when we cast size() to integer, even though the size can be long. It looks like the problem is ConcatenatingByteArrayCollector and toByteArray method. ConcatenatingByteArrayCollector class should handle arbitrary BytesInput by checking its size first and splitting the data across slabs (in this particular case even when the total size is larger than i32 max value) - I'd say that is the whole point of concatenating byte array in the first place. Note that BytesInput instance itself did not fail with overflow. It only failed when turning it to a byte array. If what you mentioned in the comment was true, I would expect failure when creating an instance of BytesInput. Collector should be filling up the last slab and allocating those as it requests more and more data from BytesInput, similar to output stream. Regarding your comment around page size check - it is merely a mitigation strategy that makes it pass the error. It is not a fix. I already included it in PARQUET-1633. My understanding is parquet.page.size.row.check.min affects the page size itself. If page size was larger than 2GB (which, I would expect the code to fail here: [https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116] Or here: [https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123] Those conditions explicitly check for the page size. But the code did not fail there. > Negative initial size when writing large values in parquet-mr > - > > Key: PARQUET-1632 > URL: https://issues.apache.org/jira/browse/PARQUET-1632 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.10.1 >Reporter: Ivan Sadikov >Assignee: Junjie Chen >Priority: Major > > I encountered an issue when writing large string values to Parquet. > Here is the code to reproduce the issue: > {code:java} > import org.apache.spark.sql.functions._ > def longString: String = "a" * (64 * 1024 * 1024) > val long_string = udf(() => longString) > val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) > spark.conf.set("parquet.enable.dictionary", "false") > df.write.option("compression", > "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} > > This Spark job fails with the exception: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): > org.apache.spark.SparkException: Task failed while writing rows. at >
[jira] [Commented] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903082#comment-16903082 ] Ivan Sadikov commented on PARQUET-1632: --- I don't think your assessment is correct. Yes, it overflows when we cast size() to integer, even though the size can be long. It looks like the problem is ConcatenatingByteArrayCollector and toByteArray method. ConcatenatingByteArrayCollector class should handle arbitrary BytesInput by checking its size first and splitting the data across slabs (in this particular case even when the total size is larger than i32 max value) - I'd say that is the whole point of concatenating byte array in the first place. Note that BytesInput instance itself did not fail with overflow. It only failed when turning it to a byte array. If what you mentioned in the comment was true, I would expect failure when creating an instance of BytesInput. Collector should be filling up the last slab and allocating those as it requests more and more data from BytesInput, similar to output stream. Regarding your comment around page size check - it is merely a mitigation strategy that makes it pass the error. It is not a fix. I already included it in PARQUET-1633. My understanding is parquet.page.size.row.check.min affects the page size itself. If page size was larger than 2GB (which, I would expect the code to fail here: [https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L116] Or here: [https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java#L123] Those conditions explicitly check for the page size. But the code did not fail there. > Negative initial size when writing large values in parquet-mr > - > > Key: PARQUET-1632 > URL: https://issues.apache.org/jira/browse/PARQUET-1632 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.10.1 >Reporter: Ivan Sadikov >Assignee: Junjie Chen >Priority: Major > > I encountered an issue when writing large string values to Parquet. > Here is the code to reproduce the issue: > {code:java} > import org.apache.spark.sql.functions._ > def longString: String = "a" * (64 * 1024 * 1024) > val long_string = udf(() => longString) > val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) > spark.conf.set("parquet.enable.dictionary", "false") > df.write.option("compression", > "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} > > This Spark job fails with the exception: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): > org.apache.spark.SparkException: Task failed while writing rows. at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: Negative initial size: > -1610612543 at > java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at > org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at > org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at >
[jira] [Updated] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Sadikov updated PARQUET-1632: -- Description: I encountered an issue when writing large string values to Parquet. Here is the code to reproduce the issue: {code:java} import org.apache.spark.sql.functions._ def longString: String = "a" * (64 * 1024 * 1024) val long_string = udf(() => longString) val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) spark.conf.set("parquet.enable.dictionary", "false") df.write.option("compression", "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} This Spark job fails with the exception: {code:java} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Negative initial size: -1610612543 at java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) ... 11 more{code} Would appreciate if you could help addressing the problem. Thanks! was: I encountered an issue when writing large string values to Parquet. Here is the code to reproduce the issue: {code:java} import org.apache.spark.sql.functions._ def longString: String = "a" * (64 * 1024 * 1024) val long_string = udf(() => longString) val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) spark.conf.set("parquet.enable.dictionary", "false") df.write.option("compression", "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} This Spark job fails with the exception: {code:java} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10861.0 (TID
[jira] [Commented] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16902098#comment-16902098 ] Ivan Sadikov commented on PARQUET-1632: --- Thank you. > Negative initial size when writing large values in parquet-mr > - > > Key: PARQUET-1632 > URL: https://issues.apache.org/jira/browse/PARQUET-1632 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.10.1 >Reporter: Ivan Sadikov >Assignee: Junjie Chen >Priority: Major > > I encountered an issue when writing large string values to Parquet. > Here is the code to reproduce the issue: > {code:java} > import org.apache.spark.sql.functions._ > def longString: String = "a" * (64 * 1024 * 1024) > val long_string = udf(() => longString) > val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) > spark.conf.set("parquet.enable.dictionary", "false") > df.write.option("compression", > "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} > > This Spark job fails with the exception: > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in > stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): > org.apache.spark.SparkException: Task failed while writing rows. at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at > org.apache.spark.scheduler.Task.run(Task.scala:112) at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.IllegalArgumentException: Negative initial size: -1610612543 at > java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at > org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at > org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at > org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at > org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) > at > org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) > at > org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) > at > org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165) > at > org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) > ... 11 more{code} > > Would appreciate if you could help addressing the problem. Thanks! -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (PARQUET-1633) Integer overflow in ParquetFileReader.ConsecutiveChunkList
[ https://issues.apache.org/jira/browse/PARQUET-1633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16902096#comment-16902096 ] Ivan Sadikov commented on PARQUET-1633: --- I would say it is a bug. There are a few instances of overflow I found in parquet-mr, another example being https://issues.apache.org/jira/browse/PARQUET-1632. I also have a simple repro with Spark; by the way, if you make records shorter, the problem still persists: {code:java} import org.apache.spark.sql.functions._ val large_str = udf(() => "a" * (128 * 1024 * 1024)) val df = spark.range(0, 20, 1, 1).withColumn("large_str", large_str()) spark.conf.set("parquet.enable.dictionary", "false") spark.conf.set("parquet.page.size.row.check.min", "1") // this is done so I don't hit PARQUET-1632 df.write.option("compression", "uncompressed").mode("overwrite").parquet("/mnt/large.parquet") spark.read.parquet("/mnt/large.parquet").foreach(_ => Unit) // Fails{code} Here is the stacktrace: {code:java} org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 22.0 failed 4 times, most recent failure: Lost task 10.3 in stage 22.0 (TID 121, 10.0.217.97, executor 1): java.lang.IllegalArgumentException: Illegal Capacity: -191 at java.util.ArrayList.(ArrayList.java:157) at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1169) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:811){code} Well, there is no restriction on row group size in parquet format. Also, it is not a significant effort to patch this issue - making ChunkDescriptor size field as well as ConsecutiveChunkList length to have long type should patch this problem (it already has long type in metadata). > Integer overflow in ParquetFileReader.ConsecutiveChunkList > -- > > Key: PARQUET-1633 > URL: https://issues.apache.org/jira/browse/PARQUET-1633 > Project: Parquet > Issue Type: Bug > Components: parquet-mr >Affects Versions: 1.10.1 >Reporter: Ivan Sadikov >Priority: Major > > When reading a large Parquet file (2.8GB), I encounter the following > exception: > {code:java} > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value > at 0 in block -1 in file > dbfs:/user/hive/warehouse/demo.db/test_table/part-00014-tid-1888470069989036737-593c82a4-528b-4975-8de0-5bcbc5e9827d-10856-1-c000.snappy.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:228) > ... 14 more > Caused by: java.lang.IllegalArgumentException: Illegal Capacity: -212 > at java.util.ArrayList.(ArrayList.java:157) > at > org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1169){code} > > The file metadata is: > * block 1 (3 columns) > ** rowCount: 110,100 > ** totalByteSize: 348,492,072 > ** compressedSize: 165,689,649 > * block 2 (3 columns) > ** rowCount: 90,054 > ** totalByteSize: 3,243,165,541 > ** compressedSize: 2,509,579,966 > * block 3 (3 columns) > ** rowCount: 105,119 > ** totalByteSize: 350,901,693 > ** compressedSize: 144,952,177 > * block 4 (3 columns) > ** rowCount: 48,741 > ** totalByteSize: 1,275,995 > ** compressedSize: 914,205 > I don't have the code to reproduce the issue, unfortunately; however, I > looked at the code and it seems that integer {{length}} field in > ConsecutiveChunkList overflows, which results in negative capacity for array > list in {{readAll}} method: > {code:java} > int fullAllocations = length / options.getMaxAllocationSize(); > int lastAllocationSize = length % options.getMaxAllocationSize(); > > int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); > List buffers = new ArrayList<>(numAllocations);{code} > > This is caused by cast to integer in {{readNextRowGroup}} method in > ParquetFileReader: > {code:java} > currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, > (int)mc.getTotalSize())); > {code} > which overflows when total size of the column is larger than > Integer.MAX_VALUE. > I would appreciate if you could help addressing the issue. Thanks! > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
[ https://issues.apache.org/jira/browse/PARQUET-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Sadikov updated PARQUET-1632: -- Description: I encountered an issue when writing large string values to Parquet. Here is the code to reproduce the issue: {code:java} import org.apache.spark.sql.functions._ def longString: String = "a" * (64 * 1024 * 1024) val long_string = udf(() => longString) val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) spark.conf.set("parquet.enable.dictionary", "false") df.write.option("compression", "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} This Spark job fails with the exception: {code:java} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Negative initial size: -1610612543 at java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) ... 11 more{code} Would appreciate if you could help addressing the problem. Thanks! was: I encountered an issue when writing large string values to Parquet. Here is the code to reproduce the issue: {code:java} import org.apache.spark.sql.functions._ def longString: String = "a" * (64 * 1024 * 1024) val long_string = udf(() => longString) val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) spark.conf.set("parquet.enable.dictionary", "false") df.write.option("compression", "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} This Spark job fails with the exception: {code:java} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10861.0 (TID
[jira] [Created] (PARQUET-1633) Integer overflow in ParquetFileReader.ConsecutiveChunkList
Ivan Sadikov created PARQUET-1633: - Summary: Integer overflow in ParquetFileReader.ConsecutiveChunkList Key: PARQUET-1633 URL: https://issues.apache.org/jira/browse/PARQUET-1633 Project: Parquet Issue Type: Bug Components: parquet-mr Affects Versions: 1.10.1 Reporter: Ivan Sadikov When reading a large Parquet file (2.8GB), I encounter the following exception: {code:java} Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file dbfs:/user/hive/warehouse/demo.db/test_table/part-00014-tid-1888470069989036737-593c82a4-528b-4975-8de0-5bcbc5e9827d-10856-1-c000.snappy.parquet at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:228) ... 14 more Caused by: java.lang.IllegalArgumentException: Illegal Capacity: -212 at java.util.ArrayList.(ArrayList.java:157) at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1169){code} The file metadata is: * block 1 (3 columns) ** rowCount: 110,100 ** totalByteSize: 348,492,072 ** compressedSize: 165,689,649 * block 2 (3 columns) ** rowCount: 90,054 ** totalByteSize: 3,243,165,541 ** compressedSize: 2,509,579,966 * block 3 (3 columns) ** rowCount: 105,119 ** totalByteSize: 350,901,693 ** compressedSize: 144,952,177 * block 4 (3 columns) ** rowCount: 48,741 ** totalByteSize: 1,275,995 ** compressedSize: 914,205 I don't have the code to reproduce the issue, unfortunately; however, I looked at the code and it seems that integer {{length}} field in ConsecutiveChunkList overflows, which results in negative capacity for array list in {{readAll}} method: {code:java} int fullAllocations = length / options.getMaxAllocationSize(); int lastAllocationSize = length % options.getMaxAllocationSize(); int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); List buffers = new ArrayList<>(numAllocations);{code} This is caused by cast to integer in {{readNextRowGroup}} method in ParquetFileReader: {code:java} currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize())); {code} which overflows when total size of the column is larger than Integer.MAX_VALUE. I would appreciate if you could help addressing the issue. Thanks! -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (PARQUET-1632) Negative initial size when writing large values in parquet-mr
Ivan Sadikov created PARQUET-1632: - Summary: Negative initial size when writing large values in parquet-mr Key: PARQUET-1632 URL: https://issues.apache.org/jira/browse/PARQUET-1632 Project: Parquet Issue Type: Bug Components: parquet-mr Affects Versions: 1.10.1 Reporter: Ivan Sadikov I encountered an issue when writing large string values to Parquet. Here is the code to reproduce the issue: {code:java} import org.apache.spark.sql.functions._ def longString: String = "a" * (64 * 1024 * 1024) val long_string = udf(() => longString) val df = spark.range(0, 40, 1, 1).withColumn("large_str", long_string()) spark.conf.set("parquet.enable.dictionary", "false") df.write.option("compression", "uncompressed").mode("overwrite").parquet("/tmp/large.parquet"){code} This Spark job fails with the exception: {code:java} Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10861.0 failed 4 times, most recent failure: Lost task 0.3 in stage 10861.0 (TID 671168, 10.0.180.13, executor 14656): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139) at org.apache.spark.scheduler.Task.run(Task.scala:112) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Negative initial size: -1610612543 at java.io.ByteArrayOutputStream.(ByteArrayOutputStream.java:74) at org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:234) at org.apache.parquet.bytes.BytesInput$BAOS.(BytesInput.java:232) at org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202) at org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172) at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114) at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165) at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248) ... 11 more{code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (PARQUET-458) [C++] Implement support for DataPageV2
[ https://issues.apache.org/jira/browse/PARQUET-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876738#comment-16876738 ] Ivan Sadikov commented on PARQUET-458: -- Hello, If it helps, Rust parquet library in Apache Arrow already implements Data page V2 for both writes and reads, so you can use that as a reference when working on C++ code and check correctness in either Rust or C++. Here are some links for column reader/writer: https://github.com/apache/arrow/blob/master/rust/parquet/src/column/reader.rs#L342 https://github.com/apache/arrow/blob/master/rust/parquet/src/column/writer.rs#L544 We also added a bunch of Data page v2 encodings added in Rust, and quite a few correctness tests to check encoding/decoding works. Hopefully, it helps, [~wesmckinn]. > [C++] Implement support for DataPageV2 > -- > > Key: PARQUET-458 > URL: https://issues.apache.org/jira/browse/PARQUET-458 > Project: Parquet > Issue Type: New Feature > Components: parquet-cpp >Reporter: Wes McKinney >Assignee: Wes McKinney >Priority: Minor > Fix For: cpp-1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (PARQUET-750) Parquet tools cat/head parity, small fixes
[ https://issues.apache.org/jira/browse/PARQUET-750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15576394#comment-15576394 ] Ivan Sadikov commented on PARQUET-750: -- PR: https://github.com/apache/parquet-mr/pull/378 > Parquet tools cat/head parity, small fixes > -- > > Key: PARQUET-750 > URL: https://issues.apache.org/jira/browse/PARQUET-750 > Project: Parquet > Issue Type: Bug >Reporter: Ivan Sadikov >Priority: Trivial > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (PARQUET-750) Parquet tools cat/head parity, small fixes
Ivan Sadikov created PARQUET-750: Summary: Parquet tools cat/head parity, small fixes Key: PARQUET-750 URL: https://issues.apache.org/jira/browse/PARQUET-750 Project: Parquet Issue Type: Bug Reporter: Ivan Sadikov Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (PARQUET-748) Update assembly for parquet tools
[ https://issues.apache.org/jira/browse/PARQUET-748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Sadikov resolved PARQUET-748. -- Resolution: Won't Fix > Update assembly for parquet tools > - > > Key: PARQUET-748 > URL: https://issues.apache.org/jira/browse/PARQUET-748 > Project: Parquet > Issue Type: Improvement >Reporter: Ivan Sadikov >Priority: Trivial > > Small update for {{parquet-tools}} module to be able to create distribution > including shell scripts and all dependencies. Also documentation update. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (PARQUET-748) Update assembly for parquet tools
Ivan Sadikov created PARQUET-748: Summary: Update assembly for parquet tools Key: PARQUET-748 URL: https://issues.apache.org/jira/browse/PARQUET-748 Project: Parquet Issue Type: Improvement Reporter: Ivan Sadikov Priority: Trivial Small update for {{parquet-tools}} module to be able to create distribution including shell scripts and all dependencies. Also documentation update. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (PARQUET-748) Update assembly for parquet tools
[ https://issues.apache.org/jira/browse/PARQUET-748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15566544#comment-15566544 ] Ivan Sadikov commented on PARQUET-748: -- I will work on this, it is fairly trivial and will get myself familiarized with code base a little bit. > Update assembly for parquet tools > - > > Key: PARQUET-748 > URL: https://issues.apache.org/jira/browse/PARQUET-748 > Project: Parquet > Issue Type: Improvement >Reporter: Ivan Sadikov >Priority: Trivial > > Small update for {{parquet-tools}} module to be able to create distribution > including shell scripts and all dependencies. Also documentation update. -- This message was sent by Atlassian JIRA (v6.3.4#6332)