[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362009#comment-16362009
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5447


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.5.0, 1.4.2
>
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362004#comment-16362004
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5447
  
merging.


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361733#comment-16361733
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167746607
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,28 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   ClassCastException replace;
+   if (outputTag != null) {
+   // Enrich error message
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
+   "This can occur 
when multiple OutputTags with different types " +
+   "but identical 
names are being used.",
+   e.getMessage(),
+   outputTag.getId()));
+
+   throw new 
ExceptionInChainedOperatorException(replace);
+   } else {
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
--- End diff --

Sorry for my misunderstanding. I'm unfamiliar with flink's business now. 
Thanks~


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361156#comment-16361156
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167633643
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,28 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   ClassCastException replace;
+   if (outputTag != null) {
+   // Enrich error message
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
+   "This can occur 
when multiple OutputTags with different types " +
+   "but identical 
names are being used.",
+   e.getMessage(),
+   outputTag.getId()));
+
+   throw new 
ExceptionInChainedOperatorException(replace);
+   } else {
+   replace = new ClassCastException(
+   String.format(
+   "%s. Failed to push 
OutputTag with id '%s' to operator. " +
--- End diff --

Please properly read my comments. Jut replace the else block with `throw e`.

If the OutputTag is null there's no point in modifying the error message.


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#7

[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359714#comment-16359714
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5447
  
@zentol I have corrected the code, please helps to review. Thank you very 
much!


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359369#comment-16359369
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167395827
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

Thanks @zentol A good method!


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359363#comment-16359363
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167395295
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

Neither. just add an else block that re-throws the original exception.


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359336#comment-16359336
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167393568
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

Okay. So, should we try catch NPE or just output the null id ?


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359327#comment-16359327
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5447#discussion_r167392891
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -591,16 +591,18 @@ public void collect(StreamRecord record) {
operator.setKeyContextElement1(copy);
operator.processElement(copy);
} catch (ClassCastException e) {
-   // Enrich error message
-   ClassCastException replace = new 
ClassCastException(
-   String.format(
-   "%s. Failed to push OutputTag 
with id '%s' to operator. " +
-   "This can occur when multiple 
OutputTags with different types " +
-   "but identical names are being 
used.",
-   e.getMessage(),
-   outputTag.getId()));
-
-   throw new 
ExceptionInChainedOperatorException(replace);
+   if (outputTag != null) {
--- End diff --

you are now completely swallowing the exception if the outputtag is null 
which is unacceptable.


> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359213#comment-16359213
 ] 

ASF GitHub Bot commented on FLINK-8423:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5447

[FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE

## What is the purpose of the change
Fix the NPE when outputTag is null.


## Brief change log
Add when outputTag is not null, then do the catch block work.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (/ no)
  - If yes, how is the feature documented? ( not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8423

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5447


commit 65ac580ca5a3b4c78c954ff08eef9bedcb1e9713
Author: zhangminglei 
Date:   2018-02-10T02:32:43Z

[FLINK-8423] OperatorChain#pushToOperator catch block may fail with NPE




> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-09 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359208#comment-16359208
 ] 

mingleizhang commented on FLINK-8423:
-

Hi, [~Zentol] Does this NPE will make the jvm died ?

> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8423) OperatorChain#pushToOperator catch block may fail with NPE

2018-02-09 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358322#comment-16358322
 ] 

mingleizhang commented on FLINK-8423:
-

I will fix this issue.

> OperatorChain#pushToOperator catch block may fail with NPE
> --
>
> Key: FLINK-8423
> URL: https://issues.apache.org/jira/browse/FLINK-8423
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
>
> {code}
> @Override
> protected  void pushToOperator(StreamRecord record) {
>   try {
>   // we know that the given outputTag matches our OutputTag so 
> the record
>   // must be of the type that our operator (and Serializer) 
> expects.
>   @SuppressWarnings("unchecked")
>   StreamRecord castRecord = (StreamRecord) record;
>   numRecordsIn.inc();
>   StreamRecord copy = 
> castRecord.copy(serializer.copy(castRecord.getValue()));
>   operator.setKeyContextElement1(copy);
>   operator.processElement(copy);
>   } catch (ClassCastException e) {
>   // Enrich error message
>   ClassCastException replace = new ClassCastException(
>   String.format(
>   "%s. Failed to push OutputTag with id '%s' to 
> operator. " +
>   "This can occur when multiple OutputTags with 
> different types " +
>   "but identical names are being used.",
>   e.getMessage(),
>   outputTag.getId()));
>   throw new ExceptionInChainedOperatorException(replace);
>   } catch (Exception e) {
>   throw new ExceptionInChainedOperatorException(e);
>   }
> }
> {code}
> If outputTag is null (as is the case when no sideOutput was defined) the 
> catch block will crash with a NullPointerException. This may happen if 
> {{operator.processElement}} throws a ClassCastException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)