flink git commit: [FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled
Repository: flink Updated Branches: refs/heads/release-1.3 cd4c2b590 -> c1f578fba [FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled This closes #4496. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1f578fb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1f578fb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1f578fb Branch: refs/heads/release-1.3 Commit: c1f578fba60be7b77e1588367721f57b52b61225 Parents: cd4c2b5 Author: Xpray Authored: Tue Aug 8 16:18:26 2017 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Tue Aug 8 19:22:09 2017 +0800 -- .../streaming/runtime/tasks/OperatorChain.java | 12 .../streaming/api/StreamingOperatorsITCase.java| 17 + 2 files changed, 25 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 870c2ed..0875279 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -612,8 +612,10 @@ public class OperatorChain> implements Strea output.collect(shallowCopy); } - // don't copy for the last output - outputs[outputs.length - 1].collect(record); + if (outputs.length > 0) { + // don't copy for the last output + outputs[outputs.length - 1].collect(record); + } } @Override @@ -625,8 +627,10 @@ public class OperatorChain> implements Strea output.collect(outputTag, shallowCopy); } - // don't copy for the last output - outputs[outputs.length - 1].collect(outputTag, record); + if (outputs.length > 0) { + // don't copy for the last output + outputs[outputs.length - 1].collect(outputTag, record); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java index 8ea1bd8..39a8dd7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.streaming.api; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -34,6 +35,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.apache.flink.util.Collector; import org.apache.flink.util.MathUtils; import org.junit.*; @@ -373,4 +376,18 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase collections.clear(); } } + + @Test + public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + DataStream input = env.fromElements(1, 2, 3); + input.flatMap(new FlatMapFunction() { + @Override + public void flatMap(Integer value, Collector out) throws Exception { + out.collect(value << 1); + } + }); + env.execute(); +
flink git commit: [FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled
Repository: flink Updated Branches: refs/heads/master 4dfefd042 -> 6f5fa7f74 [FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled This closes #4496. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f5fa7f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f5fa7f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f5fa7f7 Branch: refs/heads/master Commit: 6f5fa7f741538207244368c275bee9958c43a25a Parents: 4dfefd0 Author: Xpray Authored: Tue Aug 8 16:18:26 2017 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Tue Aug 8 19:20:32 2017 +0800 -- .../streaming/runtime/tasks/OperatorChain.java | 12 .../streaming/api/StreamingOperatorsITCase.java| 17 + 2 files changed, 25 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 0f29b73..b15f126 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -612,8 +612,10 @@ public class OperatorChain> implements Strea output.collect(shallowCopy); } - // don't copy for the last output - outputs[outputs.length - 1].collect(record); + if (outputs.length > 0) { + // don't copy for the last output + outputs[outputs.length - 1].collect(record); + } } @Override @@ -625,8 +627,10 @@ public class OperatorChain> implements Strea output.collect(outputTag, shallowCopy); } - // don't copy for the last output - outputs[outputs.length - 1].collect(outputTag, record); + if (outputs.length > 0) { + // don't copy for the last output + outputs[outputs.length - 1].collect(outputTag, record); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java index 6d2f8c5..32a04fa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.streaming.api; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -34,6 +35,8 @@ import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + +import org.apache.flink.util.Collector; import org.apache.flink.util.MathUtils; import org.junit.Assert; @@ -378,4 +381,18 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase collections.clear(); } } + + @Test + public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + DataStream input = env.fromElements(1, 2, 3); + input.flatMap(new FlatMapFunction() { + @Override + public void flatMap(Integer value, Collector out) throws Exception { + out.collect(value << 1); + } + }); + env.execute(); +