Repository: incubator-beam Updated Branches: refs/heads/master b8951c231 -> d5b1d5135
[BEAM-207] Flink test flake in ReadSourceStreamingITCase Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd8bc93e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd8bc93e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd8bc93e Branch: refs/heads/master Commit: dd8bc93ec3104c481a9ea646406194c1116dae71 Parents: 70e6a13 Author: Maximilian Michels <m...@apache.org> Authored: Tue Apr 19 09:20:30 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Apr 20 19:17:50 2016 +0200 ---------------------------------------------------------------------- .../flink/translation/wrappers/SourceInputFormat.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd8bc93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index dc11c77..debd1a1 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -46,8 +46,8 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> private transient PipelineOptions options; private final SerializedPipelineOptions serializedOptions; - private transient BoundedSource.BoundedReader<T> reader = null; - private boolean inputAvailable = true; + private transient BoundedSource.BoundedReader<T> reader; + private boolean inputAvailable = false; public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) { this.initialSource = initialSource; @@ -135,6 +135,9 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>> @Override public void close() throws IOException { - reader.close(); + // TODO null check can be removed once FLINK-3796 is fixed + if (reader != null) { + reader.close(); + } } }