[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012 ] mingleizhang edited comment on FLINK-6311 at 4/19/17 10:02 AM: --- [~tzulitai] I just watch FlinkKinesisConsumer which under the package of {code}org.apache.flink.streaming.connectors.kinesis{code} in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml does not contains the {code}flink-connector-kinesis{code}. I would think we should add the module {code} flink-connector-kinesis {code} in flink-connectors pom.xml and then return to this issue. How do you think of this ? was (Author: mingleizhang): [~tzulitai] I just watch FlinkKinesisConsumer which under the package of {code}org.apache.flink.streaming.connectors.kinesis{code} in the module flink-connector-kinesis. And the flink-connector-kinesis is under the module of flink-connectors in which pom.xml dont contains the {code}flink-connector-kinesis{code}. I would think we should add the module {code} flink-connector-kinesis {code} in flink-connectors pom.xml and then return to this issue. How do you think of this ? > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974213#comment-15974213 ] Tzu-Li (Gordon) Tai edited comment on FLINK-6311 at 4/19/17 7:18 AM: - I think we can just do: {code} if (mainThread != null) { mainThread.interrupt(); } {code} and likewise for the executor. The reason is that there can be cases that the consumer is closed before these fields reference actual values (they are assigned after run() starts, but there is no guarantee when close() is called on the consumer). So, we should just do null checks to safe guard against such conditions. was (Author: tzulitai): I think we can just do: {code} if (mainThread != null) { mainThread.interrupt(); } {code} and likewise for the executor. The reason is that there can be cases that the consumer is closed before this fields reference actual values (they are assigned after run() starts, but there is no guarantee when close() is called on the consumer). So, we should just do null checks to safe guard against such conditions. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207 ] mingleizhang edited comment on FLINK-6311 at 4/19/17 7:14 AM: -- [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does necessary, what kinda messages should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} was (Author: mingleizhang): [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does really, what kinda messages should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207 ] mingleizhang edited comment on FLINK-6311 at 4/19/17 7:13 AM: -- [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it does really, what kinda messages should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} was (Author: mingleizhang): [~tzulitai] Hi, here is the sample code I put here, please check it out and how do you think about it ? BTW, Should we put more error messages instead of just "mainThread is null" ? If it is really, what kinda message should we put it here? Thanks ~ :D {code} public void shutdownFetcher() { running = false; checkNotNull(mainThread, "mainThread is null."); mainThread.interrupt(); // the main thread may be sleeping for the discovery interval if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); } checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is null."); shardConsumersExecutor.shutdownNow(); } {code} > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)