[FLINK-3197] [core] Close InputStream in BinaryInputFormat#createStatistics reliably
This closes #1494 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c556f74 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c556f74 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c556f74 Branch: refs/heads/master Commit: 2c556f74e491a47e7c10e7b8cc03e5c65cf34d23 Parents: 9365441 Author: Ajay Bhat <a.ajay.b...@gmail.com> Authored: Fri Jan 8 12:31:02 2016 +0530 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Jan 15 11:44:21 2016 +0100 ---------------------------------------------------------------------- .../flink/api/common/io/BinaryInputFormat.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c556f74/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 61e3a1a..e738d52 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -192,7 +192,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> { /** * Fill in the statistics. The last modification time and the total input size are prefilled. - * + * * @param files * The files that are associated with this block input format. * @param stats @@ -213,11 +213,13 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> { continue; } - FSDataInputStream fdis = file.getPath().getFileSystem().open(file.getPath(), blockInfo.getInfoSize()); - fdis.seek(file.getLen() - blockInfo.getInfoSize()); - - blockInfo.read(new DataInputViewStreamWrapper(fdis)); - totalCount += blockInfo.getAccumulatedRecordCount(); + FileSystem fs = file.getPath().getFileSystem(); + try (FSDataInputStream fdis = fs.open(file.getPath(), blockInfo.getInfoSize())) { + fdis.seek(file.getLen() - blockInfo.getInfoSize()); + + blockInfo.read(new DataInputViewStreamWrapper(fdis)); + totalCount += blockInfo.getAccumulatedRecordCount(); + } } final float avgWidth = totalCount == 0 ? 0 : ((float) stats.getTotalInputSize() / totalCount); @@ -270,7 +272,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> { if (this.reachedEnd()) { return null; } - + record = this.deserialize(record, this.dataInputStream); this.readRecords++; return record;