flink git commit: [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE
Repository: flink Updated Branches: refs/heads/release-1.4 beff62d2e -> bafb91eeb [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE This closes #5447. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bafb91ee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bafb91ee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bafb91ee Branch: refs/heads/release-1.4 Commit: bafb91eeb50a2821771a852919b2358fb43622f2 Parents: beff62d Author: zhangminglei Authored: Tue Feb 13 10:33:04 2018 +0800 Committer: zentol Committed: Tue Feb 13 09:53:45 2018 +0100 -- .../streaming/runtime/tasks/OperatorChain.java | 25 +++- 1 file changed, 14 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/bafb91ee/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index a44cffb..1c117ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -548,17 +548,20 @@ public class OperatorChain> implements Strea operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); - + if (outputTag != null) { + // Enrich error message + ClassCastException replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + + "This can occur when multiple OutputTags with different types " + + "but identical names are being used.", + e.getMessage(), + outputTag.getId())); + + throw new ExceptionInChainedOperatorException(replace); + } else { + throw new ExceptionInChainedOperatorException(e); + } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); }
flink git commit: [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE
Repository: flink Updated Branches: refs/heads/master 24c30878e -> 9e139a72b [FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE This closes #5447. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e139a72 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e139a72 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e139a72 Branch: refs/heads/master Commit: 9e139a72ba45f2dd820bd3b9ecdf8428588666fd Parents: 24c3087 Author: zhangminglei Authored: Tue Feb 13 10:33:04 2018 +0800 Committer: zentol Committed: Tue Feb 13 09:51:57 2018 +0100 -- .../streaming/runtime/tasks/OperatorChain.java | 25 +++- 1 file changed, 14 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9e139a72/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index fdeea17..f3c7293 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -591,17 +591,20 @@ public class OperatorChain> implements Strea operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { - // Enrich error message - ClassCastException replace = new ClassCastException( - String.format( - "%s. Failed to push OutputTag with id '%s' to operator. " + - "This can occur when multiple OutputTags with different types " + - "but identical names are being used.", - e.getMessage(), - outputTag.getId())); - - throw new ExceptionInChainedOperatorException(replace); - + if (outputTag != null) { + // Enrich error message + ClassCastException replace = new ClassCastException( + String.format( + "%s. Failed to push OutputTag with id '%s' to operator. " + + "This can occur when multiple OutputTags with different types " + + "but identical names are being used.", + e.getMessage(), + outputTag.getId())); + + throw new ExceptionInChainedOperatorException(replace); + } else { + throw new ExceptionInChainedOperatorException(e); + } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); }