Repository: incubator-beam
Updated Branches:
  refs/heads/master b8951c231 -> d5b1d5135


[BEAM-207] Flink test flake in ReadSourceStreamingITCase


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd8bc93e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd8bc93e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd8bc93e

Branch: refs/heads/master
Commit: dd8bc93ec3104c481a9ea646406194c1116dae71
Parents: 70e6a13
Author: Maximilian Michels <m...@apache.org>
Authored: Tue Apr 19 09:20:30 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Apr 20 19:17:50 2016 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/SourceInputFormat.java       | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd8bc93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index dc11c77..debd1a1 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -46,8 +46,8 @@ public class SourceInputFormat<T> implements InputFormat<T, 
SourceInputSplit<T>>
   private transient PipelineOptions options;
   private final SerializedPipelineOptions serializedOptions;
 
-  private transient BoundedSource.BoundedReader<T> reader = null;
-  private boolean inputAvailable = true;
+  private transient BoundedSource.BoundedReader<T> reader;
+  private boolean inputAvailable = false;
 
   public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions 
options) {
     this.initialSource = initialSource;
@@ -135,6 +135,9 @@ public class SourceInputFormat<T> implements InputFormat<T, 
SourceInputSplit<T>>
 
   @Override
   public void close() throws IOException {
-    reader.close();
+    // TODO null check can be removed once FLINK-3796 is fixed
+    if (reader != null) {
+      reader.close();
+    }
   }
 }

Reply via email to