flink git commit: [FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

2017-08-08 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.3 cd4c2b590 -> c1f578fba


[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

This closes #4496.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1f578fb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1f578fb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1f578fb

Branch: refs/heads/release-1.3
Commit: c1f578fba60be7b77e1588367721f57b52b61225
Parents: cd4c2b5
Author: Xpray 
Authored: Tue Aug 8 16:18:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Tue Aug 8 19:22:09 2017 +0800

--
 .../streaming/runtime/tasks/OperatorChain.java | 12 
 .../streaming/api/StreamingOperatorsITCase.java| 17 +
 2 files changed, 25 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 870c2ed..0875279 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -612,8 +612,10 @@ public class OperatorChain implements Strea
output.collect(shallowCopy);
}
 
-   // don't copy for the last output
-   outputs[outputs.length - 1].collect(record);
+   if (outputs.length > 0) {
+   // don't copy for the last output
+   outputs[outputs.length - 1].collect(record);
+   }
}
 
@Override
@@ -625,8 +627,10 @@ public class OperatorChain implements Strea
output.collect(outputTag, shallowCopy);
}
 
-   // don't copy for the last output
-   outputs[outputs.length - 1].collect(outputTag, record);
+   if (outputs.length > 0) {
+   // don't copy for the last output
+   outputs[outputs.length - 1].collect(outputTag, 
record);
+   }
}
}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c1f578fb/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 8ea1bd8..39a8dd7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.streaming.api;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,8 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
 import org.junit.*;
 
@@ -373,4 +376,18 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
collections.clear();
}
}
+
+   @Test
+   public void testOperatorChainWithObjectReuseAndNoOutputOperators() 
throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().enableObjectReuse();
+   DataStream input = env.fromElements(1, 2, 3);
+   input.flatMap(new FlatMapFunction() {
+   @Override
+   public void flatMap(Integer value, Collector 
out) throws Exception {
+  

flink git commit: [FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

2017-08-08 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master 4dfefd042 -> 6f5fa7f74


[FLINK-7385] Fix ArrayIndexOutOfBoundsException when object-reuse is enabled

This closes #4496.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f5fa7f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f5fa7f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f5fa7f7

Branch: refs/heads/master
Commit: 6f5fa7f741538207244368c275bee9958c43a25a
Parents: 4dfefd0
Author: Xpray 
Authored: Tue Aug 8 16:18:26 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Tue Aug 8 19:20:32 2017 +0800

--
 .../streaming/runtime/tasks/OperatorChain.java | 12 
 .../streaming/api/StreamingOperatorsITCase.java| 17 +
 2 files changed, 25 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0f29b73..b15f126 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -612,8 +612,10 @@ public class OperatorChain implements Strea
output.collect(shallowCopy);
}
 
-   // don't copy for the last output
-   outputs[outputs.length - 1].collect(record);
+   if (outputs.length > 0) {
+   // don't copy for the last output
+   outputs[outputs.length - 1].collect(record);
+   }
}
 
@Override
@@ -625,8 +627,10 @@ public class OperatorChain implements Strea
output.collect(outputTag, shallowCopy);
}
 
-   // don't copy for the last output
-   outputs[outputs.length - 1].collect(outputTag, record);
+   if (outputs.length > 0) {
+   // don't copy for the last output
+   outputs[outputs.length - 1].collect(outputTag, 
record);
+   }
}
}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6f5fa7f7/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 6d2f8c5..32a04fa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.streaming.api;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -34,6 +35,8 @@ import 
org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.MathUtils;
 
 import org.junit.Assert;
@@ -378,4 +381,18 @@ public class StreamingOperatorsITCase extends 
StreamingMultipleProgramsTestBase
collections.clear();
}
}
+
+   @Test
+   public void testOperatorChainWithObjectReuseAndNoOutputOperators() 
throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().enableObjectReuse();
+   DataStream input = env.fromElements(1, 2, 3);
+   input.flatMap(new FlatMapFunction() {
+   @Override
+   public void flatMap(Integer value, Collector 
out) throws Exception {
+