flink git commit: [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE

2018-02-13 Thread chesnay
Repository: flink
Updated Branches:
  refs/heads/release-1.4 beff62d2e -> bafb91eeb


[FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE

This closes #5447.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bafb91ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bafb91ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bafb91ee

Branch: refs/heads/release-1.4
Commit: bafb91eeb50a2821771a852919b2358fb43622f2
Parents: beff62d
Author: zhangminglei 
Authored: Tue Feb 13 10:33:04 2018 +0800
Committer: zentol 
Committed: Tue Feb 13 09:53:45 2018 +0100

--
 .../streaming/runtime/tasks/OperatorChain.java  | 25 +++-
 1 file changed, 14 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/bafb91ee/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index a44cffb..1c117ac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -548,17 +548,20 @@ public class OperatorChain> implements Strea
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
-
+   if (outputTag != null) {
+   // Enrich error message
+   ClassCastException replace = new 
ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
+   "This can occur 
when multiple OutputTags with different types " +
+   "but identical 
names are being used.",
+   e.getMessage(),
+   outputTag.getId()));
+
+   throw new 
ExceptionInChainedOperatorException(replace);
+   } else {
+   throw new 
ExceptionInChainedOperatorException(e);
+   }
} catch (Exception e) {
throw new 
ExceptionInChainedOperatorException(e);
}



flink git commit: [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE

2018-02-13 Thread chesnay
Repository: flink
Updated Branches:
  refs/heads/master 24c30878e -> 9e139a72b


[FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE

This closes #5447.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e139a72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e139a72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e139a72

Branch: refs/heads/master
Commit: 9e139a72ba45f2dd820bd3b9ecdf8428588666fd
Parents: 24c3087
Author: zhangminglei 
Authored: Tue Feb 13 10:33:04 2018 +0800
Committer: zentol 
Committed: Tue Feb 13 09:51:57 2018 +0100

--
 .../streaming/runtime/tasks/OperatorChain.java  | 25 +++-
 1 file changed, 14 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9e139a72/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index fdeea17..f3c7293 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -591,17 +591,20 @@ public class OperatorChain> implements Strea
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
-
+   if (outputTag != null) {
+   // Enrich error message
+   ClassCastException replace = new 
ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
+   "This can occur 
when multiple OutputTags with different types " +
+   "but identical 
names are being used.",
+   e.getMessage(),
+   outputTag.getId()));
+
+   throw new 
ExceptionInChainedOperatorException(replace);
+   } else {
+   throw new 
ExceptionInChainedOperatorException(e);
+   }
} catch (Exception e) {
throw new 
ExceptionInChainedOperatorException(e);
}