[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974012#comment-15974012
 ] 

mingleizhang edited comment on FLINK-6311 at 4/19/17 10:02 AM:
---

[~tzulitai] I just watch FlinkKinesisConsumer which under the package of 
{code}org.apache.flink.streaming.connectors.kinesis{code} in the module 
flink-connector-kinesis.  And the flink-connector-kinesis is under the module 
of flink-connectors in which pom.xml does not  contains the 
{code}flink-connector-kinesis{code}. I would think we should add the module 
{code} flink-connector-kinesis {code} in flink-connectors pom.xml and then 
return to this issue. How do you think of this ? 


was (Author: mingleizhang):
[~tzulitai] I just watch FlinkKinesisConsumer which under the package of 
{code}org.apache.flink.streaming.connectors.kinesis{code} in the module 
flink-connector-kinesis.  And the flink-connector-kinesis is under the module 
of flink-connectors in which pom.xml dont contains the 
{code}flink-connector-kinesis{code}. I would think we should add the module 
{code} flink-connector-kinesis {code} in flink-connectors pom.xml and then 
return to this issue. How do you think of this ? 

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974213#comment-15974213
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-6311 at 4/19/17 7:18 AM:
-

I think we can just do:

{code}
if (mainThread != null) {
mainThread.interrupt();
}
{code}

and likewise for the executor.

The reason is that there can be cases that the consumer is closed before these 
fields reference actual values (they are assigned after run() starts, but there 
is no guarantee when close() is called on the consumer). So, we should just do 
null checks to safe guard against such conditions.


was (Author: tzulitai):
I think we can just do:

{code}
if (mainThread != null) {
mainThread.interrupt();
}
{code}

and likewise for the executor.

The reason is that there can be cases that the consumer is closed before this 
fields reference actual values (they are assigned after run() starts, but there 
is no guarantee when close() is called on the consumer). So, we should just do 
null checks to safe guard against such conditions.

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207
 ] 

mingleizhang edited comment on FLINK-6311 at 4/19/17 7:14 AM:
--

[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it does necessary, what kinda messages should we put 
it here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}


was (Author: mingleizhang):
[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it does really, what kinda messages should we put it 
here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-19 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974207#comment-15974207
 ] 

mingleizhang edited comment on FLINK-6311 at 4/19/17 7:13 AM:
--

[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it does really, what kinda messages should we put it 
here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}


was (Author: mingleizhang):
[~tzulitai] Hi, here is the sample code I put here, please check it out and how 
do you think about it ? BTW,  Should we put more error messages instead of just 
"mainThread is null" ? If it is really, what kinda message should we put it 
here? Thanks ~ :D
{code}
public void shutdownFetcher() {
running = false;
checkNotNull(mainThread, "mainThread is null.");
mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval

if (LOG.isInfoEnabled()) {
LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);
}
checkNotNull(shardConsumersExecutor, "shardConsumersExecutor is 
null.");
shardConsumersExecutor.shutdownNow();
}
{code}

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)