Hyukjin Kwon created PARQUET-1368:
-------------------------------------

             Summary: ParquetFileReader should close its input stream for the 
failure in constructor
                 Key: PARQUET-1368
                 URL: https://issues.apache.org/jira/browse/PARQUET-1368
             Project: Parquet
          Issue Type: Bug
          Components: parquet-mr
    Affects Versions: 1.10.0
            Reporter: Hyukjin Kwon


I was trying to replace deprecated usage {{readFooter}} to 
{{ParquetFileReader.open}} according to the node:

{code}

[warn] 
/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:368:
 method readFooter in object ParquetFileReader is deprecated: see corresponding 
Javadoc for more information.
[warn]         ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
[warn]                           ^

[warn] 
/home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:545:
 method readFooter in object ParquetFileReader is deprecated: see corresponding 
Javadoc for more information.
[warn]             ParquetFileReader.readFooter(
[warn]                               ^
{code}

Then, I realised some test suites reports resource leak:

{code}
java.lang.Throwable
        at 
org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
        at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
        at 
org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
        at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:687)
        at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
        at 
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
        at 
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
        at 
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at 
scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
        at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
        at 
scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
        at 
scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
        at 
scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
        at 
scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at 
scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

The root cause seems to be, the test case intentionally tries to read malformed 
Parquet file and see if the error can be handled correctly.

In that case, the error is thrown in it's constructor:

{code}
java.lang.RuntimeException: 
file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-c102dafc-b3f7-4c7e-90ee-33d8ecbcd225/second/_SUCCESS
 is not a Parquet file (too small length: 0)
        at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
        at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
        at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
        at 
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
        at 
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
        at 
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at 
scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at 
scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
        at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
        at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at 
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

So, in this case, 

{code}
  public ParquetFileReader(InputFile file, ParquetReadOptions options) throws 
IOException {
    this.converter = new ParquetMetadataConverter(options);
    this.file = file;
    this.f = file.newStream();
    this.options = options;
    this.footer = readFooter(file, options, f, converter);
    this.fileMetaData = footer.getFileMetaData();
    this.blocks = filterRowGroups(footer.getBlocks());
    for (ColumnDescriptor col : 
footer.getFileMetaData().getSchema().getColumns()) {
      paths.put(ColumnPath.get(col.getPath()), col);
    }
  }
{code}

the open stream {{this.f = file.newStream()}} looks unable to be closed.

Therefore, looks the test case reports the resource leak.

In case of the old deprecated {{readFooter}} it's done as below:

{code}
  @Deprecated
  public static final ParquetMetadata readFooter(InputFile file, MetadataFilter 
filter) throws IOException {
    ParquetReadOptions options;
    if (file instanceof HadoopInputFile) {
      options = HadoopReadOptions.builder(((HadoopInputFile) 
file).getConfiguration())
          .withMetadataFilter(filter).build();
    } else {
      options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
    }

    try (SeekableInputStream in = file.newStream()) {
      return readFooter(file, options, in);
    }
  }
{code}

So, looks we are fine with this deprecated method.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to