[jira] [Commented] (FLINK-8580) No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job

2020-08-14 Thread Arnaud Linz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-8580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177709#comment-17177709
 ] 

Arnaud Linz commented on FLINK-8580:


Actually my cluster is a hadoop (yarn cluster), not standalone clusters. I am 
just trying to launch each streaming application in a separate yarn container.

> No easy way (or issues when trying?) to handle multiple yarn sessions and 
> choose at runtime the one to submit a ha streaming job
> 
>
> Key: FLINK-8580
> URL: https://issues.apache.org/jira/browse/FLINK-8580
> Project: Flink
>  Issue Type: Improvement
>  Components: Command Line Client, Deployment / YARN, Runtime / 
> Coordination
>Affects Versions: 1.3.2
>Reporter: Arnaud Linz
>Priority: Major
>
> Hello,
> I am using Flink 1.3.2 and I’m struggling to achieve something that should be 
> simple.
> For isolation reasons, I want to start multiple long living yarn session 
> containers (with the same user) and choose at run-time, when I start a HA 
> streaming app, which container will hold it.
> I start my yarn session with the command line option : 
> -Dyarn.properties-file.location=mydir
> The session is created and a .yarn-properties-$USER file is generated.
> And I’ve tried the following to submit my job:
>  
> *CASE 1*
> *flink-conf.yaml* : yarn.properties-file.location: mydir
>  *flink run options* : none
>  * Uses zookeeper and works  – but I cannot choose the container as the 
> property file is global.
>  
> *CASE 2*
> *flink-conf.yaml* : nothing
>  *flink run options* : -yid applicationId
>  * Do not use zookeeper, tries to connect to yarn job manager but fails in 
> “Job submission to the JobManager timed out” error
>  
> *CASE 3*
> *flink-conf.yaml* : nothing
>  *flink run options* : -yid applicationId and -yD with all dynamic properties 
> found in the “dynamicPropertiesString” of .yarn-properties $USER file
>  * Same as case 2
>  
> *CASE 4*
> *flink-conf.yaml* : nothing
>  *flink run options* : -yD yarn.properties-file.location=mydir
>  * Tries to connect to local (non yarn) job manager (and fails)
>  
> *CASE 5*
> Even weirder:
> *flink-conf.yaml* : yarn.properties-file.location: mydir
>  *flink run options* : -yD yarn.properties-file.location=mydir
>  * Still tries to connect to local (non yarn) job manager!
>  
> Without any other solution, I've made a shell script that copies the original 
> content of FLINK_CONF_DIR in a temporary dir, modify flink-conf.yaml to set 
> yarn.properties-file.location, and change FLINK_CONF_DIR to that temp dir 
> before executing flink to submit the job.
> I am now able to select the container I want, but I think it should be made 
> simpler…
>  
> Logs extracts:
> *CASE 1:*
> {{2018:02:01 15:43:20 - Waiting until all TaskManagers have 
> connected}}{{2018:02:01 15:43:20 - Starting client actor 
> system.}}{{2018:02:01 15:43:20 - Starting 
> ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:20 - Trying to select 
> the network interface and address to use by connecting to the leading 
> JobManager.}}{{2018:02:01 15:43:20 - TaskManager will try to connect for 
> 1 milliseconds before falling back to heuristics}}{{2018:02:01 15:43:21 - 
> Retrieved new target address 
> elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01 
> 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
> Slf4jLogger started}}{{2018:02:01 15:43:21 - Starting remoting}}{{2018:02:01 
> 15:43:21 - Remoting started; listening on addresses 
> :[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]}}{{2018:02:01
>  15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 
> - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
> TaskManager status (2/1)}}{{2018:02:01 15:43:21 - All TaskManagers are 
> connected}}{{2018:02:01 15:43:21 - Submitting job with JobID: 
> f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.}}{{2018:02:01 
> 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
> Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) 
> but there is no connection to a JobManager yet.}}{{2018:02:01 15:43:21 - 
> Received job SND-IMP-SIGNAST 
> (f69197b0b80a76319a87bde10c1e3f77).}}{{2018:02:01 15:43:21 - Disconnect from 
> JobManager null.}}{{2018:02:01 15:43:21 - Connect to JobManager 
> Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01
>  15:43:21 - Connected to JobManager at 
> Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245]
>  with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.}}{{2018:02:01 
> 

[jira] [Commented] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

2020-03-10 Thread Arnaud Linz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17055651#comment-17055651
 ] 

Arnaud Linz commented on FLINK-16509:
-

I was too quick to open the case. It was a side effect from a test artifact. 
When everything is correct, a context is provided.

> FlinkKafkaConsumerBase tries to log a context that may not have been 
> initialized and fails
> --
>
> Key: FLINK-16509
> URL: https://issues.apache.org/jira/browse/FLINK-16509
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
> Environment: Unit test on local cluster, calling a unit test local 
> kafka server.
>Reporter: Arnaud Linz
>Priority: Major
>
> New code of FlinkKafkaConsumerBase#initializeState(), logs restored state 
> with:
> {code:java}
> (...)
> LOG.info("Consumer subtask {} restored state: {}.", 
> getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
> }
>  else { 
>LOG.info("Consumer subtask {} has no restore state.", 
> getRuntimeContext().getIndexOfThisSubtask()); 
> }
>   {code}
>  
> where as old (1.8.0) class was logging without calling getRuntimeContext :
>  
> {code:java}
> (...)
>   LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
> restoredState); 
> }
> else {
>  LOG.info("No restore state for FlinkKafkaConsumer."); 
> }{code}
>  
> This causes a regression in my Kafka source unit test with exception: 
> {code:java}
> java.lang.IllegalStateException: The runtime context has not been 
> initialized.   
> at 
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
> 
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
> 
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> 
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> 
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> 
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> As the context is not always available at that point (initalizeState being 
> called before open I guess)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

2020-03-10 Thread Arnaud Linz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz closed FLINK-16509.
---
Release Note: It was a side effect. When everything goes right, we do have 
a context here.
  Resolution: Not A Problem

> FlinkKafkaConsumerBase tries to log a context that may not have been 
> initialized and fails
> --
>
> Key: FLINK-16509
> URL: https://issues.apache.org/jira/browse/FLINK-16509
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
> Environment: Unit test on local cluster, calling a unit test local 
> kafka server.
>Reporter: Arnaud Linz
>Priority: Major
>
> New code of FlinkKafkaConsumerBase#initializeState(), logs restored state 
> with:
> {code:java}
> (...)
> LOG.info("Consumer subtask {} restored state: {}.", 
> getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
> }
>  else { 
>LOG.info("Consumer subtask {} has no restore state.", 
> getRuntimeContext().getIndexOfThisSubtask()); 
> }
>   {code}
>  
> where as old (1.8.0) class was logging without calling getRuntimeContext :
>  
> {code:java}
> (...)
>   LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
> restoredState); 
> }
> else {
>  LOG.info("No restore state for FlinkKafkaConsumer."); 
> }{code}
>  
> This causes a regression in my Kafka source unit test with exception: 
> {code:java}
> java.lang.IllegalStateException: The runtime context has not been 
> initialized.   
> at 
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
> 
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
> 
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> 
> at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> 
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> 
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> 
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> As the context is not always available at that point (initalizeState being 
> called before open I guess)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

2020-03-09 Thread Arnaud Linz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-16509:

Description: 
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
{code:java}
(...)
LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
}
 else { 
   LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); 
}
  {code}
 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 
{code:java}
(...)
  LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); 
}
else {
 LOG.info("No restore state for FlinkKafkaConsumer."); 
}{code}
 

This causes a regression in my Kafka source unit test with exception: 
{code:java}
java.lang.IllegalStateException: The runtime context has not been initialized.  
 
at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) 
   
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
 

As the context is not always available at that point (initalizeState being 
called before open I guess)

  was:
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
{code:java}
(...)
LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
}
 else { 
   LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); 
}
  {code}
 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 
{code:java}
(...)
  LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); 
}
else {
 LOG.info("No restore state for FlinkKafkaConsumer."); 
}{code}
 

This causes a regression in my Kafka source unit test with exception: 
{code:java}
java.lang.IllegalStateException: The runtime context has not been initialized.  
  at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) 
   
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
 

As the context is not always available at that point (initalizeState being 
called before open I guess)


> FlinkKafkaConsumerBase tries to log a context that may not have 

[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

2020-03-09 Thread Arnaud Linz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-16509:

Description: 
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
{code:java}
(...)
LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
}
 else { 
   LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); 
}
  {code}
 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 
{code:java}
(...)
  LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); 
}
else {
 LOG.info("No restore state for FlinkKafkaConsumer."); 
}{code}
 

This causes a regression in my Kafka source unit test with exception: 
{code:java}
java.lang.IllegalStateException: The runtime context has not been initialized.  
  at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) 
   
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
 

As the context is not always available at that point (initalizeState being 
called before open I guess

  was:
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:

{{ LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); }}}

 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 

{{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); 
}}}

 

This causes a regression in my Kafka source unit test with exception:


{{ java.lang.IllegalStateException: The runtime context has not been 
initialized.}}
{{ at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}}
{{ at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}}

{{ As the context is not always available at that point (initalizeState being 
called before open I guess)}}
{{  }}
{{  }}


> FlinkKafkaConsumerBase tries to log a context that may not have been 
> initialized and fails
> --
>
> Key: FLINK-16509
> URL: https://issues.apache.org/jira/browse/FLINK-16509
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
> Environment: Unit test on local cluster, calling a unit test local 
> kafka server.
>Reporter: Arnaud Linz
>Priority: Major
>
> New code of FlinkKafkaConsumerBase#initializeState(), logs restored state 
> with:
> {code:java}
> (...)
> LOG.info("Consumer subtask {} restored state: {}.", 
> getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
> }
>  else { 
>LOG.info("Consumer subtask {} has no restore state.", 
> getRuntimeContext().getIndexOfThisSubtask()); 
> }
>   {code}
>  
> where as old (1.8.0) class was logging without calling getRuntimeContext :
>  
> {code:java}
> (...)
>   LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
> restoredState); 
> }
> else {
>  LOG.info("No restore state for FlinkKafkaConsumer."); 
> }{code}
>  
> This causes a regression in my Kafka source unit test with exception: 
> {code:java}
> java.lang.IllegalStateException: The runtime context has 

[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

2020-03-09 Thread Arnaud Linz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-16509:

Description: 
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
{code:java}
(...)
LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
}
 else { 
   LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); 
}
  {code}
 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 
{code:java}
(...)
  LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); 
}
else {
 LOG.info("No restore state for FlinkKafkaConsumer."); 
}{code}
 

This causes a regression in my Kafka source unit test with exception: 
{code:java}
java.lang.IllegalStateException: The runtime context has not been initialized.  
  at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) 
   
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
 

As the context is not always available at that point (initalizeState being 
called before open I guess)

  was:
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:
{code:java}
(...)
LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); 
}
 else { 
   LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); 
}
  {code}
 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 
{code:java}
(...)
  LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); 
}
else {
 LOG.info("No restore state for FlinkKafkaConsumer."); 
}{code}
 

This causes a regression in my Kafka source unit test with exception: 
{code:java}
java.lang.IllegalStateException: The runtime context has not been initialized.  
  at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)

at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) 
   
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
{code}
 

As the context is not always available at that point (initalizeState being 
called before open I guess


> FlinkKafkaConsumerBase tries to log a context that may not have 

[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

2020-03-09 Thread Arnaud Linz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-16509:

Description: 
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:

{{ LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); }}}

 

where as old (1.8.0) class was logging without calling getRuntimeContext :

 

{{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); 
}}}

 

This causes a regression in my Kafka source unit test with exception:


{{ java.lang.IllegalStateException: The runtime context has not been 
initialized.}}
{{ at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}}
{{ at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}}

{{ As the context is not always available at that point (initalizeState being 
called before open I guess)}}
{{  }}
{{  }}

  was:
New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:

```

 LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); }

```

where as old (1.8.0) class was logging without calling getRuntimeContext :

```

 LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); 
}

``` 

This causes a regression in my Kafka source unit test with exception:

``` 
java.lang.IllegalStateException: The runtime context has not been initialized.
at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
``` 
As the context is not always available at that point (initalizeState being 
called before open I guess)
 
 


> FlinkKafkaConsumerBase tries to log a context that may not have been 
> initialized and fails
> --
>
> Key: FLINK-16509
> URL: https://issues.apache.org/jira/browse/FLINK-16509
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
> Environment: Unit test on local cluster, calling a unit test local 
> kafka server.
>Reporter: Arnaud Linz
>Priority: Major
>
> New code of FlinkKafkaConsumerBase#initializeState(), logs restored state 
> with:
> {{ LOG.info("Consumer subtask {} restored state: {}.", 
> getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
> LOG.info("Consumer subtask {} has no restore state.", 
> getRuntimeContext().getIndexOfThisSubtask()); }}}
>  
> where as old (1.8.0) class was logging without calling getRuntimeContext :
>  
> {{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
> restoredState); } else \{ LOG.info("No restore state for 
> FlinkKafkaConsumer."); }}}
>  
> This causes a regression in my Kafka source unit test with exception:
> {{ java.lang.IllegalStateException: The runtime context has not been 
> initialized.}}
> {{ at 
> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}}
> {{ at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}}
> {{ As the context is not always available at that point (initalizeState being 
> called before open I guess)}}
> {{  }}
> {{  }}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

2020-03-09 Thread Arnaud Linz (Jira)
Arnaud Linz created FLINK-16509:
---

 Summary: FlinkKafkaConsumerBase tries to log a context that may 
not have been initialized and fails
 Key: FLINK-16509
 URL: https://issues.apache.org/jira/browse/FLINK-16509
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.10.0
 Environment: Unit test on local cluster, calling a unit test local 
kafka server.
Reporter: Arnaud Linz


New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:

```

 LOG.info("Consumer subtask {} restored state: {}.", 
getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ 
LOG.info("Consumer subtask {} has no restore state.", 
getRuntimeContext().getIndexOfThisSubtask()); }

```

where as old (1.8.0) class was logging without calling getRuntimeContext :

```

 LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", 
restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); 
}

``` 

This causes a regression in my Kafka source unit test with exception:

``` 
java.lang.IllegalStateException: The runtime context has not been initialized.
at 
org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
``` 
As the context is not always available at that point (initalizeState being 
called before open I guess)
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-12-07 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz resolved FLINK-10832.
-
  Resolution: Won't Fix
Release Note: Not a flink problem, but a JDK one, solved by upgrading it.

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file 

[jira] [Comment Edited] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-12-07 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711803#comment-16711803
 ] 

Arnaud Linz edited comment on FLINK-10832 at 12/7/18 3:37 PM:
--

Okay, it seems to be a JDK related issue. 

- With 1.8.0_31x64  (windows)  it fails.

- with 1.8.0_172-amd64 (centOS) it works. 

- Also tested ok with the last jdk 1.8 for windows.

 

 

 


was (Author: arnaudl):
Okay, it seems to be a JDK related issue. With 1.8.0_31x64  (windows)  it fails.

with 1.8.0_172-amd64 (centOS) it works. 

 

 

 

 

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> 

[jira] [Comment Edited] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-12-06 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711803#comment-16711803
 ] 

Arnaud Linz edited comment on FLINK-10832 at 12/6/18 6:22 PM:
--

Okay, it seems to be a JDK related issue. With 1.8.0_31x64  (windows)  it fails.

with 1.8.0_172-amd64 (centOS) it works. 

 

 

 

 


was (Author: arnaudl):
Which jdk are you using?

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> 

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-12-06 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711803#comment-16711803
 ] 

Arnaud Linz commented on FLINK-10832:
-

Which jdk are you using?

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> 

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-30 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705012#comment-16705012
 ] 

Arnaud Linz commented on FLINK-10832:
-

I've corrected my corrupted maven cache, now the run is fine.

However, it still does not end the local cluster with 1.7.0 ...

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-30 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704554#comment-16704554
 ] 

Arnaud Linz commented on FLINK-10832:
-

Okay, I've just tested with the new 1.7.0. All I did is to replace the version 
in the pom, but the tests fails with :

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/shaded/guava18/com/google/common/hash/Hashing
 at 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:80)
 at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145)
 at 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:93)
 at 
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:669)
 at 
org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:92)
 at flink.flink_10832.App.testFlink10832(App.java:60)
 at flink.flink_10832.App.main(App.java:31)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.shaded.guava18.com.google.common.hash.Hashing
 at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 8 more

 

It seems that your maven package for 1.7.0 is not complete...

 

And I'm stuck : 

< v 1.6.0 => use case fails in out of memory

v1.6.0 => use case succeeds but yarn containeres are never killed => no go

> v1.6.0 => local cluster does not end.

What do you mean with " I ran the test from the JIRA once each on master/1.6.2 
but it worked fine" ? You did not use any flink maven dependency but you've ran 
the test from the flink project itself ? Could it be a jar packaging issue ?

 

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> 

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694731#comment-16694731
 ] 

Arnaud Linz commented on FLINK-10832:
-

Strange... I can reproduce it 100% of times. Have you tried the attached maven 
project ?

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager 

[jira] [Comment Edited] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694517#comment-16694517
 ] 

Arnaud Linz edited comment on FLINK-10832 at 11/21/18 10:24 AM:


The main thread hangs in the MiniCluster.close() method on this future get  : 

{{closeAsync().get();}}

but the terminationFuture returned by closeAsync does not complete (but all its 
fields are null under the debugger)

 

 


was (Author: arnaudl):
The main thread hangs in the MiniCluster.close() method on this future get  : 

{{closeAsync().get();}}

 

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping 

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694517#comment-16694517
 ] 

Arnaud Linz commented on FLINK-10832:
-

The main thread hangs in the MiniCluster.close() method on this future get  : 

{{closeAsync().get();}}

 

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] 

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694407#comment-16694407
 ] 

Arnaud Linz commented on FLINK-10832:
-

Sorry for the long delay, I did not get the notification, that's not a mark of 
disinterest :)

I've attached a minimal project that reproduce it, and the log it produces.

 

 

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Attachment: (was: pom.xml)

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Attachment: log.txt

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, log.txt
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Attachment: flink-10832.zip

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, pom.xml
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Attachment: pom.xml

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
> Attachments: flink-10832.zip, pom.xml
>
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> 

[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694375#comment-16694375
 ] 

Arnaud Linz commented on FLINK-10832:
-

Sorry for the long reply, I did not get the post notification. Your question 
made me think that you can't reproduce this issue, so I've made a minimal 
project and I could not reproduce it either. So it's probably a jar conflict 
with one of my project dependencies.
Based on this, I will conduct further investigations to identify the conflict 
and keep you informed.

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> 

[jira] [Issue Comment Deleted] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-21 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Comment: was deleted

(was: Sorry for the long reply, I did not get the post notification. Your 
question made me think that you can't reproduce this issue, so I've made a 
minimal project and I could not reproduce it either. So it's probably a jar 
conflict with one of my project dependencies.
Based on this, I will conduct further investigations to identify the conflict 
and keep you informed.)

> StreamExecutionEnvironment.execute() does not return when all sources end
> -
>
> Key: FLINK-10832
> URL: https://issues.apache.org/jira/browse/FLINK-10832
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.5, 1.6.2
>Reporter: Arnaud Linz
>Priority: Critical
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>     *public* *void* testFlink() *throws* Exception {
>     // get the execution environment
>     *final* StreamExecutionEnvironment env = 
> StreamExecutionEnvironment._getExecutionEnvironment_();
>     // get input data
> *final* DataStreamSource text = env.addSource(*new* 
> +SourceFunction()+ {
>     @Override
>     *public* *void* run(*final* SourceContext ctx) *throws* 
> Exception {
>     *for* (*int* count = 0; count < 5; count++) {
>     ctx.collect(String._valueOf_(count));
>     }
>     }
>     @Override
>     *public* *void* cancel() {
>     }
>     });
>     text.print().setParallelism(1);
>     env.execute("Simple Test");
>     // Never ends !
>     }
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-08 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Description: 
In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 

This code never ends : 

 {{ 
public void testFlink162() throws Exception {
// get the execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// get input data 
final DataStreamSource text = env.addSource(new 
SourceFunction() {
@Override
public void run(final SourceContext ctx) throws Exception {
for (int count = 0; count < 5; count++) {
ctx.collect(String.valueOf(count));
}
}
@Override
public void cancel() {
}
});
text.print().setParallelism(1);
env.execute("Simple Test");
// Never ends !
}

}}
 

 

It's critical for us as we heavily rely on this "source exhaustion stop" 
mechanism to achieve proper stop of streaming applications from their own code, 
so it prevents us from using the last flink versions.

 

The log extract shows that the local cluster tried to shut down, but could not 
do it for no apparent reason:

 

{{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
 {{0}}
 {{1}}
 {{2}}
 {{3}}
 {{4}}
 {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)}}
 {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)}}
 {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)}}
 {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Custom Source -> Sink: 
Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
 {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
 {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
 {{[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
 {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)}}
 {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
 {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
 {{[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
 {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
 {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
 {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
 {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
 {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
 {{[2018-11-07 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-08 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Description: 
In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 

This code never ends : 

 

{{ public void testFlink162() throws Exception {}}
{{ // get the execution environment}}
{{ final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{ // get input data }}
{{ final DataStreamSource text = env.addSource(new 
SourceFunction() {}}
{{ @Override}}
{{ public void run(final SourceContext ctx) throws Exception {}}
{{ for (int count = 0; count < 5; count++) {}}
{{ ctx.collect(String.valueOf(count));}}
{{ }}}
{{ }}}
{{ @Override}}
{{ public void cancel() {}}
{{ }}}
{{ });}}
{{ text.print().setParallelism(1);}}
{{ env.execute("Simple Test");}}
{{ // Never ends !}}
{{ }}}

 

 

It's critical for us as we heavily rely on this "source exhaustion stop" 
mechanism to achieve proper stop of streaming applications from their own code, 
so it prevents us from using the last flink versions.

 

The log extract shows that the local cluster tried to shut down, but could not 
do it for no apparent reason:

 

{{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
 {{0}}
 {{1}}
 {{2}}
 {{3}}
 {{4}}
 {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)}}
 {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)}}
 {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)}}
 {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Custom Source -> Sink: 
Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
 {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
 {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
 {{[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
 {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)}}
 {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
 {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
 {{[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
 {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
 {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
 {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
 {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
 {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
 {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
cd021102669258aad77c20645ed08aae: ResourceManager leader 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-08 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Description: 
In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 

This code never ends : 


{{ public void testFlink162() throws Exception {}}
{{ // get the execution environment}}
{{ final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{ // get input data }}
{{ final DataStreamSource text = env.addSource(new 
SourceFunction() {}}
{{ @Override}}
{{ public void run(final SourceContext ctx) throws Exception {}}
{{ for (int count = 0; count < 5; count++)}}{{{ 
ctx.collect(String.valueOf(count)); }}}{{}}}
{{ @Override}}
{{ public void cancel() {}}
{{ }}}
{{ });}}
{{ text.print().setParallelism(1);}}
{{ env.execute("Simple Test");}}
{{ // Never ends !}}
{{ }}}
{{  }}

 

It's critical for us as we heavily rely on this "source exhaustion stop" 
mechanism to achieve proper stop of streaming applications from their own code, 
so it prevents us from using the last flink versions.

 

The log extract shows that the local cluster tried to shut down, but could not 
do it for no apparent reason:

 

{{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
 {{0}}
 {{1}}
 {{2}}
 {{3}}
 {{4}}
 {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)}}
 {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)}}
 {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)}}
 {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Custom Source -> Sink: 
Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
 {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
 {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
 {{[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
 {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)}}
 {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
 {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
 {{[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
 {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
 {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
 {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
 {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
 {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
 {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
cd021102669258aad77c20645ed08aae: ResourceManager leader 

[jira] [Created] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-08 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-10832:
---

 Summary: StreamExecutionEnvironment.execute() does not return when 
all sources end
 Key: FLINK-10832
 URL: https://issues.apache.org/jira/browse/FLINK-10832
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.6.2, 1.5.5
Reporter: Arnaud Linz


In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 

This code never ends : 

 

{{    public void testFlink() throws Exception {}}
{{    // get the execution environment}}
{{    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{    // get input data}}
{{final DataStreamSource text = env.addSource(new 
SourceFunction() {}}
{{    @Override}}
{{    public void run(final SourceContext ctx) throws Exception 
{}}
{{    for (int count = 0; count < 5; count++) {}}
{{    ctx.collect(String.valueOf(count));}}
{{    }}}
{{    }}}
{{    @Override}}
{{    public void cancel() {}}
{{    }}}
{{    });}}
{{    text.print().setParallelism(1);}}
{{    env.execute("Simple Test");}}
{{    // Never ends !}}
{{    }}}{{ }}

 

It's critical for us as we heavily rely on this "source exhaustion stop" 
mechanism to achieve proper stop of streaming applications from their own code, 
so it prevents us from using the last flink versions.

 

The log extract shows that the local cluster tried to shut down, but could not 
do it for no apparent reason:

 

{{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
{{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
{{0}}
{{1}}
{{2}}
{{3}}
{{4}}
{{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)}}
{{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)}}
{{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for 
task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)}}
{{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Custom Source -> Sink: 
Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
{{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
{{[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
{{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
{{[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
{{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)}}
{{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
{{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
{{[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
{{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
{{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
{{[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
{{[2018-11-07 11:11:23,593] 

[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end

2018-11-08 Thread Arnaud Linz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:

Description: 
In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 

This code never ends : 

    *public* *void* testFlink() *throws* Exception {

    // get the execution environment

    *final* StreamExecutionEnvironment env = 
StreamExecutionEnvironment._getExecutionEnvironment_();

    // get input data

*final* DataStreamSource text = env.addSource(*new* 
+SourceFunction()+ {

    @Override

    *public* *void* run(*final* SourceContext ctx) *throws* 
Exception {

    *for* (*int* count = 0; count < 5; count++) {

    ctx.collect(String._valueOf_(count));

    }

    }

    @Override

    *public* *void* cancel() {

    }

    });

    text.print().setParallelism(1);

    env.execute("Simple Test");

    // Never ends !

    }

 

It's critical for us as we heavily rely on this "source exhaustion stop" 
mechanism to achieve proper stop of streaming applications from their own code, 
so it prevents us from using the last flink versions.

 

The log extract shows that the local cluster tried to shut down, but could not 
do it for no apparent reason:

 

{{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
 {{0}}
 {{1}}
 {{2}}
 {{3}}
 {{4}}
 {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)}}
 {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)}}
 {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)}}
 {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Custom Source -> Sink: 
Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
 {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
 {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
 {{[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
 {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)}}
 {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
 {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
 {{[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
 {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
 {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
 {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
 {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
 {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 

[jira] [Updated] (FLINK-8580) No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job

2018-02-07 Thread Arnaud Linz (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-8580:
---
Description: 
Hello,

I am using Flink 1.3.2 and I’m struggling to achieve something that should be 
simple.

For isolation reasons, I want to start multiple long living yarn session 
containers (with the same user) and choose at run-time, when I start a HA 
streaming app, which container will hold it.

I start my yarn session with the command line option : 
-Dyarn.properties-file.location=mydir

The session is created and a .yarn-properties-$USER file is generated.

And I’ve tried the following to submit my job:

 

*CASE 1*

*flink-conf.yaml* : yarn.properties-file.location: mydir
 *flink run options* : none
 * Uses zookeeper and works  – but I cannot choose the container as the 
property file is global.

 

*CASE 2*

*flink-conf.yaml* : nothing
 *flink run options* : -yid applicationId
 * Do not use zookeeper, tries to connect to yarn job manager but fails in “Job 
submission to the JobManager timed out” error

 

*CASE 3*

*flink-conf.yaml* : nothing
 *flink run options* : -yid applicationId and -yD with all dynamic properties 
found in the “dynamicPropertiesString” of .yarn-properties $USER file
 * Same as case 2

 

*CASE 4*

*flink-conf.yaml* : nothing
 *flink run options* : -yD yarn.properties-file.location=mydir
 * Tries to connect to local (non yarn) job manager (and fails)

 

*CASE 5*

Even weirder:

*flink-conf.yaml* : yarn.properties-file.location: mydir
 *flink run options* : -yD yarn.properties-file.location=mydir
 * Still tries to connect to local (non yarn) job manager!

 

Without any other solution, I've made a shell script that copies the original 
content of FLINK_CONF_DIR in a temporary dir, modify flink-conf.yaml to set 
yarn.properties-file.location, and change FLINK_CONF_DIR to that temp dir 
before executing flink to submit the job.

I am now able to select the container I want, but I think it should be made 
simpler…

 

Logs extracts:

*CASE 1:*

{{2018:02:01 15:43:20 - Waiting until all TaskManagers have 
connected}}{{2018:02:01 15:43:20 - Starting client actor system.}}{{2018:02:01 
15:43:20 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:20 - 
Trying to select the network interface and address to use by connecting to the 
leading JobManager.}}{{2018:02:01 15:43:20 - TaskManager will try to connect 
for 1 milliseconds before falling back to heuristics}}{{2018:02:01 15:43:21 
- Retrieved new target address 
elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01 
15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
Slf4jLogger started}}{{2018:02:01 15:43:21 - Starting remoting}}{{2018:02:01 
15:43:21 - Remoting started; listening on addresses 
:[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]}}{{2018:02:01
 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - TaskManager 
status (2/1)}}{{2018:02:01 15:43:21 - All TaskManagers are 
connected}}{{2018:02:01 15:43:21 - Submitting job with JobID: 
f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.}}{{2018:02:01 
15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) 
but there is no connection to a JobManager yet.}}{{2018:02:01 15:43:21 - 
Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).}}{{2018:02:01 
15:43:21 - Disconnect from JobManager null.}}{{2018:02:01 15:43:21 - Connect to 
JobManager 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01
 15:43:21 - Connected to JobManager at 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245]
 with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.}}{{2018:02:01 
15:43:21 - Sending message to JobManager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
 to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for 
progress}}{{2018:02:01 15:43:21 - Upload jar files to job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
 15:43:21 - Blob client connecting to 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager}}{{2018:02:01
 15:43:22 - Submit job to the job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully submitted to 
the JobManager akka://flink/deadLetters.}}{{2018:02:01 15:43:22 - 02/01/2018 
15:43:22   Job execution switched to status RUNNING.}}

 

*CASE 2:*

{{2018:02:01 15:48:43 - Waiting until all TaskManagers have 

[jira] [Created] (FLINK-8580) No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job

2018-02-07 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-8580:
--

 Summary: No easy way (or issues when trying?) to handle multiple 
yarn sessions and choose at runtime the one to submit a ha streaming job
 Key: FLINK-8580
 URL: https://issues.apache.org/jira/browse/FLINK-8580
 Project: Flink
  Issue Type: Improvement
  Components: Job-Submission
Affects Versions: 1.3.2
Reporter: Arnaud Linz


Hello,

I am using Flink 1.3.2 and I’m struggling to achieve something that should be 
simple.

For isolation reasons, I want to start multiple long living yarn session 
containers (with the same user) and choose at run-time, when I start a HA 
streaming app, which container will hold it.

I start my yarn session with the command line option : 
-Dyarn.properties-file.location=mydir

The session is created and a .yarn-properties-$USER file is generated.

And I’ve tried the following to submit my job:

 

*CASE 1* 

*flink-conf.yaml* : yarn.properties-file.location: mydir
*flink run options* : none
 * Uses zookeeper and works  – but I cannot choose the container as the 
property file is global.

 ** 

*CASE 2*

*flink-conf.yaml* : nothing
*flink run options* : -yid applicationId
 * Do not use zookeeper, tries to connect to yarn job manager but fails in “Job 
submission to the JobManager timed out” error

 ** 

*CASE 3*

*flink-conf.yaml* : nothing
*flink run options* : -yid applicationId and -yD with all dynamic properties 
found in the “dynamicPropertiesString” of .yarn-properties-$USER file
 * Same as case 2

 ** 

*CASE 4*

*flink-conf.yaml* : nothing
*flink run options* : -yD yarn.properties-file.location=mydir
 * Tries to connect to local (non yarn) job manager (and fails)

 ** 

*CASE 5*

Even weirder:

*flink-conf.yaml* : yarn.properties-file.location: mydir
*flink run options* : -yD yarn.properties-file.location=mydir


 * Still tries to connect to local (non yarn) job manager!

 

Without any other solution, I've made a shell script that copies the original 
content of FLINK_CONF_DIR in a temporary dir, modify flink-conf.yaml to set 
yarn.properties-file.location, and change FLINK_CONF_DIR to that temp dir 
before executing flink to submit the job.

I am now able to select the container I want, but I think it should be made 
simpler…

 

Logs extracts:

*CASE 1:*

{{2018:02:01 15:43:20 - Waiting until all TaskManagers have 
connected}}{{2018:02:01 15:43:20 - Starting client actor system.}}{{2018:02:01 
15:43:20 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:20 - 
Trying to select the network interface and address to use by connecting to the 
leading JobManager.}}{{2018:02:01 15:43:20 - TaskManager will try to connect 
for 1 milliseconds before falling back to heuristics}}{{2018:02:01 15:43:21 
- Retrieved new target address 
elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01 
15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
Slf4jLogger started}}{{2018:02:01 15:43:21 - Starting remoting}}{{2018:02:01 
15:43:21 - Remoting started; listening on addresses 
:[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]}}{{2018:02:01
 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - TaskManager 
status (2/1)}}{{2018:02:01 15:43:21 - All TaskManagers are 
connected}}{{2018:02:01 15:43:21 - Submitting job with JobID: 
f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.}}{{2018:02:01 
15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - 
Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) 
but there is no connection to a JobManager yet.}}{{2018:02:01 15:43:21 - 
Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).}}{{2018:02:01 
15:43:21 - Disconnect from JobManager null.}}{{2018:02:01 15:43:21 - Connect to 
JobManager 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01
 15:43:21 - Connected to JobManager at 
Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245]
 with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.}}{{2018:02:01 
15:43:21 - Sending message to JobManager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager
 to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for 
progress}}{{2018:02:01 15:43:21 - Upload jar files to job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
 15:43:21 - Blob client connecting to 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager}}{{2018:02:01
 15:43:22 - Submit job to the job manager 
akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01
 15:43:22 

[jira] [Closed] (FLINK-6289) ExecutionEnvironment.readTextFile() can read gzip files & directories but not both

2017-04-10 Thread Arnaud Linz (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz closed FLINK-6289.
--
   Resolution: Invalid
Fix Version/s: 1.1.3

Actually, it works... It seems I was to quick to report a problem I cannot 
reproduce any longer, sorry...

> ExecutionEnvironment.readTextFile() can read gzip files & directories but not 
> both
> --
>
> Key: FLINK-6289
> URL: https://issues.apache.org/jira/browse/FLINK-6289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.3
>Reporter: Arnaud Linz
>Priority: Minor
> Fix For: 1.1.3
>
>
> When calling `ExecutionEnvironment.readTextFile()` passing a ".gz" file as 
> input, it deflates the file correctly. If I pass a directory as input, it 
> reads the files contained in the directory. But If I pass a directory 
> containing ".gz" files as input, it does not deflate the files and treat them 
> as ASCII files.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6289) ExecutionEnvironment.readTextFile() can read gzip files & directories but not both

2017-04-10 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-6289:
--

 Summary: ExecutionEnvironment.readTextFile() can read gzip files & 
directories but not both
 Key: FLINK-6289
 URL: https://issues.apache.org/jira/browse/FLINK-6289
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.3
Reporter: Arnaud Linz
Priority: Minor


When calling `ExecutionEnvironment.readTextFile()` passing a ".gz" file as 
input, it deflates the file correctly. If I pass a directory as input, it reads 
the files contained in the directory. But If I pass a directory containing 
".gz" files as input, it does not deflate the files and treat them as ASCII 
files.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5305) Documentation of collect() should mention parameter akka.framesize

2016-12-09 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-5305:
--

 Summary: Documentation of collect() should mention parameter 
akka.framesize
 Key: FLINK-5305
 URL: https://issues.apache.org/jira/browse/FLINK-5305
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.1.3
Reporter: Arnaud Linz
Priority: Minor


When calling collect() on a big dataset, if the data size exceeds the value of 
akka.framesize, which is 10Mb by default, the collect() freezes and some "akka 
payload size exceeded" logs appear in the yarn logs (but neither in the flink 
console output nor UI logs).

10Mb is not that big, and its relation to akka.framesize is hard to find in the 
documentation (in Flink internals section). I believe that the "easy to find" 
collect documentation should mention this parameter to save hours of users time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4079) YARN properties file used for per-job cluster

2016-06-21 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341327#comment-15341327
 ] 

Arnaud Linz commented on FLINK-4079:


Personally, I'd like to since it is a blocking problem on one of my environment.

> YARN properties file used for per-job cluster
> -
>
> Key: FLINK-4079
> URL: https://issues.apache.org/jira/browse/FLINK-4079
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.0.3
>Reporter: Ufuk Celebi
>Assignee: Maximilian Michels
> Fix For: 1.1.0, 1.0.4
>
>
> YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN 
> properties file, which defines the container configuration. This can lead to 
> unexpected behaviour, because the per-job-cluster configuration is merged  
> with the YARN properties file (or used as only configuration source).
> A user ran into this as follows:
> - Create a long-lived YARN session with HA (creates a hidden YARN properties 
> file)
> - Submits standalone batch jobs with a per job cluster (flink run -m 
> yarn-cluster). The batch jobs get submitted to the long lived HA cluster, 
> because of the properties file.
> [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch 
> and will be fixed with the client refactoring you are working on?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-06-10 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324308#comment-15324308
 ] 

Arnaud Linz commented on FLINK-3763:


No, but I do remember having lost patience on my early usage of the RMQ source 
upon cancellation. 

> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
> Fix For: 1.1.0
>
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-06-09 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322517#comment-15322517
 ] 

Arnaud Linz commented on FLINK-3763:


This comment is probably too late, but I have a problem with :

 while (running) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();

and

public void cancel() {
  running = false;
}

If no element ever comes in the queue, nextDelivery may wait forever as no 
timeout is provided, so the while(running) is never executed, and the cancel() 
method does nothing. It prevents us from properly cancelling such sources, 
forcing the user to kill the yarn application to be able to bypass the forever 
"cancelling" state of the flink workflow.


> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
> Fix For: 1.1.0
>
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-30 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306684#comment-15306684
 ] 

Arnaud Linz commented on FLINK-3763:


IMO, this source lacks many RMQ features in order to make it usable in 
production.
VirtualHost is one of them ; but I think that the whole RabbitMQ factory should 
be made available to the user. Currently it is not even accessible from an 
inherited class.

It would also be a good idea to make this source truly cancellable by adding 
something like {{consumer.getChannel().abort(AMQP.REPLY_SUCCESS, "Cancellation 
requested by user");}} in the {{cancel()}} method or else it may never end in a 
clean way.

> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Issue Comment Deleted] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-30 Thread Arnaud Linz (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-3763:
---
Comment: was deleted

(was: IMO, this source lacks many RMQ features in order to make it usable in 
production.

VirtualHost is one of them ; but shutdown timeout is also important, or else 
the the stream may fail after 10 minutes of inactivity => the whole RabbitMQ 
factory should be made available to the user. Currently it is not even 
accessible from an inherited class.

It would also be a good idea to make this source truly cancellable by adding 
something like {{consumer.getChannel().abort(AMQP.REPLY_SUCCESS, "Cancellation 
requested by user");}} in the {{cancel()}} method or else it may never end in a 
clean way.

)

> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-30 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306678#comment-15306678
 ] 

Arnaud Linz commented on FLINK-3763:


IMO, this source lacks many RMQ features in order to make it usable in 
production.

VirtualHost is one of them ; but shutdown timeout is also important, or else 
the the stream may fail after 10 minutes of inactivity => the whole RabbitMQ 
factory should be made available to the user. Currently it is not even 
accessible from an inherited class.

It would also be a good idea to make this source truly cancellable by adding 
something like {{consumer.getChannel().abort(AMQP.REPLY_SUCCESS, "Cancellation 
requested by user");}} in the {{cancel()}} method or else it may never end in a 
clean way.



> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

2015-09-11 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14740440#comment-14740440
 ] 

Arnaud Linz commented on FLINK-2617:


Well, I've searched apache snapshot repository for 0.9-SNAPSHOT and I've looked 
at http://stratosphere-bin.s3.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz. 
I was trying to avoid building it from the sources until I can get the time to 
configure my env.

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> --
>
> Key: FLINK-2617
> URL: https://issues.apache.org/jira/browse/FLINK-2617
> Project: Flink
>  Issue Type: Bug
>  Components: Hadoop Compatibility
>Reporter: Arnaud Linz
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 0.9, 0.10
>
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 

[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

2015-09-11 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14740386#comment-14740386
 ] 

Arnaud Linz commented on FLINK-2617:


Hi, today (11-09)  I've tested the 0.10 nightly build with success, thank you 
very much.
However, I did not find the patch in the 0.9 snapshot ; is it normal?

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> --
>
> Key: FLINK-2617
> URL: https://issues.apache.org/jira/browse/FLINK-2617
> Project: Flink
>  Issue Type: Bug
>  Components: Hadoop Compatibility
>Reporter: Arnaud Linz
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 0.9, 0.10
>
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> 

[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

2015-09-09 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736262#comment-14736262
 ] 

Arnaud Linz commented on FLINK-2617:


hive-hcatalog-core-0.14.0.jar

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> --
>
> Key: FLINK-2617
> URL: https://issues.apache.org/jira/browse/FLINK-2617
> Project: Flink
>  Issue Type: Bug
>  Components: Hadoop Compatibility
>Reporter: Arnaud Linz
>Priority: Critical
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
> java.util.HashMap$KeyIterator.next(HashMap.java:960)
> java.util.AbstractCollection.addAll(AbstractCollection.java:341)
> java.util.HashSet.(HashSet.java:117)
> 

[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

2015-09-03 Thread Arnaud Linz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14729289#comment-14729289
 ] 

Arnaud Linz commented on FLINK-2617:


(I think that the subject is in fact critical because it happens really often 
in my cluster as soon as parallelism is > 250)

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> --
>
> Key: FLINK-2617
> URL: https://issues.apache.org/jira/browse/FLINK-2617
> Project: Flink
>  Issue Type: Bug
>  Components: Hadoop Compatibility
>Reporter: Arnaud Linz
>Priority: Critical
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
> java.util.HashMap$KeyIterator.next(HashMap.java:960)
> 

[jira] [Updated] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

2015-09-03 Thread Arnaud Linz (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-2617:
---
Priority: Critical  (was: Major)

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> --
>
> Key: FLINK-2617
> URL: https://issues.apache.org/jira/browse/FLINK-2617
> Project: Flink
>  Issue Type: Bug
>  Components: Hadoop Compatibility
>Reporter: Arnaud Linz
>Priority: Critical
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
> java.util.HashMap$KeyIterator.next(HashMap.java:960)
> java.util.AbstractCollection.addAll(AbstractCollection.java:341)
> java.util.HashSet.(HashSet.java:117)
> 

[jira] [Created] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

2015-09-03 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-2617:
--

 Summary: ConcurrentModificationException when using 
HCatRecordReader to access a hive table
 Key: FLINK-2617
 URL: https://issues.apache.org/jira/browse/FLINK-2617
 Project: Flink
  Issue Type: Bug
  Components: Hadoop Compatibility
Reporter: Arnaud Linz


I don't know if it's a hcat or a flink problem, but when reading a hive table 
in a cluster with many slots (20 threads per container), I systematically run 
into a {{ConcurrentModificationException}} in a copy method of a 
{{Configuration}} object that change during the copy.

>From what I understand, this object comes from 
>{{TaskAttemptContext.getConfiguration()}} created by 
>{{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
>TaskAttemptID());}} 

Maybe the {{job.Configuration}} object passed to the constructor of 
{{HadoopInputFormatBase}} should be cloned somewhere?

Stack trace is :
{code}
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
org.apache.flink.client.program.Client.run(Client.java:413)
org.apache.flink.client.program.Client.run(Client.java:356)
org.apache.flink.client.program.Client.run(Client.java:349)
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by:  java.util.ConcurrentModificationException
java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
java.util.HashMap$KeyIterator.next(HashMap.java:960)
java.util.AbstractCollection.addAll(AbstractCollection.java:341)
java.util.HashSet.(HashSet.java:117)
org.apache.hadoop.conf.Configuration.(Configuration.java:554)
org.apache.hadoop.mapred.JobConf.(JobConf.java:439)
org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637)
org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112)
org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)

[jira] [Created] (FLINK-2589) Threads created in TimeTriggerPolicy don't end properly

2015-08-28 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-2589:
--

 Summary: Threads created in TimeTriggerPolicy don't end properly
 Key: FLINK-2589
 URL: https://issues.apache.org/jira/browse/FLINK-2589
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Arnaud Linz
Priority: Minor


TimeTriggerPolicy uses a thread (TimeCheck) to push fake events in case of time 
out. However, this threads implements a infinite loop and ignore 
InterruptExceptions : it never ends properly.

Once created, it continues to push fake events even if the execution is over, 
polluting the standard error with stacktraces because the fake element post 
fails. This  especially occurs in unit tests using local clusters, because the 
JVM does not end.

Stack trace extract :

java.lang.RuntimeException: Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:291)
at 
org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:276)
at 
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
at 
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
at 
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:44)
at 
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:57)
at 
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:62)
at 
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:47)
at 
org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288)
... 9 more
Caused by: java.lang.RuntimeException: Could not forward element to next 
operator
at 
org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:291)
at 
org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:276)
at 
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
at 
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37)
at 
org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288)
... 17 more
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:37)
at 
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50)
at 
org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30)
at 
org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:43)
at 
org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288)
... 22 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
... 27 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2579) StreamExecutionEnvironment ExecutionEnvironment do not share interface but have a lot in common

2015-08-27 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-2579:
--

 Summary: StreamExecutionEnvironment  ExecutionEnvironment do not 
share interface but have a lot in common
 Key: FLINK-2579
 URL: https://issues.apache.org/jira/browse/FLINK-2579
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Arnaud Linz
Priority: Minor


Both classes 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment and 
org.apache.flink.api.java.ExecutionEnvironment have a lot in common (same 
methods for kryo registration, fromCollection, etc) but are not related by a 
java contract.

That leads to annoying differences, for instance : 
StreamExecutionEnvironment.setParallelism() returns 'this' where as 
ExecutionEnvironment.setParallelism() has not return value.

They have specificities, but maybe they should both implement a common 
Interface to make sure that the common signatures are coherent?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2580) HadoopDataOutputStream does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream

2015-08-27 Thread Arnaud Linz (JIRA)
Arnaud Linz created FLINK-2580:
--

 Summary: HadoopDataOutputStream does not expose enough methods of 
org.apache.hadoop.fs.FSDataOutputStream
 Key: FLINK-2580
 URL: https://issues.apache.org/jira/browse/FLINK-2580
 Project: Flink
  Issue Type: Improvement
  Components: Hadoop Compatibility
Reporter: Arnaud Linz
Priority: Minor


I’ve noticed that when you use org.apache.flink.core.fs.FileSystem to write 
into a hdfs file, calling 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(), it returns a  
HadoopDataOutputStream that wraps a org.apache.hadoop.fs.FSDataOutputStream 
(under its org.apache.hadoop.hdfs.client .HdfsDataOutputStream wrappper).
 
However, FSDataOutputStream exposes many methods like flush,   getPos etc, but 
HadoopDataOutputStream only wraps write  close.
 
For instance, flush() calls the default, empty implementation of OutputStream 
instead of the hadoop one, and that’s confusing. Moreover, because of the 
restrictive OutputStream interface, hsync() and hflush() are not exposed to 
Flink.

I see two options:

- complete the class to wrap all methods of OutputStream and add a 
getWrappedStream() to access other stuff like hsync().

- get rid of the Hadoop wrapping and directly use Hadoop file system objects.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)