[FLINK-3197] [core] Close InputStream in BinaryInputFormat#createStatistics 
reliably

This closes #1494


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c556f74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c556f74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c556f74

Branch: refs/heads/master
Commit: 2c556f74e491a47e7c10e7b8cc03e5c65cf34d23
Parents: 9365441
Author: Ajay Bhat <a.ajay.b...@gmail.com>
Authored: Fri Jan 8 12:31:02 2016 +0530
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 15 11:44:21 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/io/BinaryInputFormat.java      | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c556f74/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 61e3a1a..e738d52 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -192,7 +192,7 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
 
        /**
         * Fill in the statistics. The last modification time and the total 
input size are prefilled.
-        * 
+        *
         * @param files
         *        The files that are associated with this block input format.
         * @param stats
@@ -213,11 +213,13 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                                continue;
                        }
 
-                       FSDataInputStream fdis = 
file.getPath().getFileSystem().open(file.getPath(), blockInfo.getInfoSize());
-                       fdis.seek(file.getLen() - blockInfo.getInfoSize());
-                       
-                       blockInfo.read(new DataInputViewStreamWrapper(fdis));
-                       totalCount += blockInfo.getAccumulatedRecordCount();
+                       FileSystem fs = file.getPath().getFileSystem();
+                       try (FSDataInputStream fdis = fs.open(file.getPath(), 
blockInfo.getInfoSize())) {
+                               fdis.seek(file.getLen() - 
blockInfo.getInfoSize());
+
+                               blockInfo.read(new 
DataInputViewStreamWrapper(fdis));
+                               totalCount += 
blockInfo.getAccumulatedRecordCount();
+                       }
                }
 
                final float avgWidth = totalCount == 0 ? 0 : ((float) 
stats.getTotalInputSize() / totalCount);
@@ -270,7 +272,7 @@ public abstract class BinaryInputFormat<T> extends 
FileInputFormat<T> {
                if (this.reachedEnd()) {
                        return null;
                }
-               
+
                record = this.deserialize(record, this.dataInputStream);
                this.readRecords++;
                return record;

Reply via email to