[jira] [Commented] (FLINK-8580) No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job
[ https://issues.apache.org/jira/browse/FLINK-8580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177709#comment-17177709 ] Arnaud Linz commented on FLINK-8580: Actually my cluster is a hadoop (yarn cluster), not standalone clusters. I am just trying to launch each streaming application in a separate yarn container. > No easy way (or issues when trying?) to handle multiple yarn sessions and > choose at runtime the one to submit a ha streaming job > > > Key: FLINK-8580 > URL: https://issues.apache.org/jira/browse/FLINK-8580 > Project: Flink > Issue Type: Improvement > Components: Command Line Client, Deployment / YARN, Runtime / > Coordination >Affects Versions: 1.3.2 >Reporter: Arnaud Linz >Priority: Major > > Hello, > I am using Flink 1.3.2 and I’m struggling to achieve something that should be > simple. > For isolation reasons, I want to start multiple long living yarn session > containers (with the same user) and choose at run-time, when I start a HA > streaming app, which container will hold it. > I start my yarn session with the command line option : > -Dyarn.properties-file.location=mydir > The session is created and a .yarn-properties-$USER file is generated. > And I’ve tried the following to submit my job: > > *CASE 1* > *flink-conf.yaml* : yarn.properties-file.location: mydir > *flink run options* : none > * Uses zookeeper and works – but I cannot choose the container as the > property file is global. > > *CASE 2* > *flink-conf.yaml* : nothing > *flink run options* : -yid applicationId > * Do not use zookeeper, tries to connect to yarn job manager but fails in > “Job submission to the JobManager timed out” error > > *CASE 3* > *flink-conf.yaml* : nothing > *flink run options* : -yid applicationId and -yD with all dynamic properties > found in the “dynamicPropertiesString” of .yarn-properties $USER file > * Same as case 2 > > *CASE 4* > *flink-conf.yaml* : nothing > *flink run options* : -yD yarn.properties-file.location=mydir > * Tries to connect to local (non yarn) job manager (and fails) > > *CASE 5* > Even weirder: > *flink-conf.yaml* : yarn.properties-file.location: mydir > *flink run options* : -yD yarn.properties-file.location=mydir > * Still tries to connect to local (non yarn) job manager! > > Without any other solution, I've made a shell script that copies the original > content of FLINK_CONF_DIR in a temporary dir, modify flink-conf.yaml to set > yarn.properties-file.location, and change FLINK_CONF_DIR to that temp dir > before executing flink to submit the job. > I am now able to select the container I want, but I think it should be made > simpler… > > Logs extracts: > *CASE 1:* > {{2018:02:01 15:43:20 - Waiting until all TaskManagers have > connected}}{{2018:02:01 15:43:20 - Starting client actor > system.}}{{2018:02:01 15:43:20 - Starting > ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:20 - Trying to select > the network interface and address to use by connecting to the leading > JobManager.}}{{2018:02:01 15:43:20 - TaskManager will try to connect for > 1 milliseconds before falling back to heuristics}}{{2018:02:01 15:43:21 - > Retrieved new target address > elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01 > 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - > Slf4jLogger started}}{{2018:02:01 15:43:21 - Starting remoting}}{{2018:02:01 > 15:43:21 - Remoting started; listening on addresses > :[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]}}{{2018:02:01 > 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 > - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - > TaskManager status (2/1)}}{{2018:02:01 15:43:21 - All TaskManagers are > connected}}{{2018:02:01 15:43:21 - Submitting job with JobID: > f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.}}{{2018:02:01 > 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - > Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) > but there is no connection to a JobManager yet.}}{{2018:02:01 15:43:21 - > Received job SND-IMP-SIGNAST > (f69197b0b80a76319a87bde10c1e3f77).}}{{2018:02:01 15:43:21 - Disconnect from > JobManager null.}}{{2018:02:01 15:43:21 - Connect to JobManager > Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01 > 15:43:21 - Connected to JobManager at > Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] > with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.}}{{2018:02:01 >
[jira] [Commented] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
[ https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17055651#comment-17055651 ] Arnaud Linz commented on FLINK-16509: - I was too quick to open the case. It was a side effect from a test artifact. When everything is correct, a context is provided. > FlinkKafkaConsumerBase tries to log a context that may not have been > initialized and fails > -- > > Key: FLINK-16509 > URL: https://issues.apache.org/jira/browse/FLINK-16509 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 > Environment: Unit test on local cluster, calling a unit test local > kafka server. >Reporter: Arnaud Linz >Priority: Major > > New code of FlinkKafkaConsumerBase#initializeState(), logs restored state > with: > {code:java} > (...) > LOG.info("Consumer subtask {} restored state: {}.", > getRuntimeContext().getIndexOfThisSubtask(), restoredState); > } > else { >LOG.info("Consumer subtask {} has no restore state.", > getRuntimeContext().getIndexOfThisSubtask()); > } > {code} > > where as old (1.8.0) class was logging without calling getRuntimeContext : > > {code:java} > (...) > LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", > restoredState); > } > else { > LOG.info("No restore state for FlinkKafkaConsumer."); > }{code} > > This causes a regression in my Kafka source unit test with exception: > {code:java} > java.lang.IllegalStateException: The runtime context has not been > initialized. > at > org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > {code} > > As the context is not always available at that point (initalizeState being > called before open I guess) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
[ https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz closed FLINK-16509. --- Release Note: It was a side effect. When everything goes right, we do have a context here. Resolution: Not A Problem > FlinkKafkaConsumerBase tries to log a context that may not have been > initialized and fails > -- > > Key: FLINK-16509 > URL: https://issues.apache.org/jira/browse/FLINK-16509 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 > Environment: Unit test on local cluster, calling a unit test local > kafka server. >Reporter: Arnaud Linz >Priority: Major > > New code of FlinkKafkaConsumerBase#initializeState(), logs restored state > with: > {code:java} > (...) > LOG.info("Consumer subtask {} restored state: {}.", > getRuntimeContext().getIndexOfThisSubtask(), restoredState); > } > else { >LOG.info("Consumer subtask {} has no restore state.", > getRuntimeContext().getIndexOfThisSubtask()); > } > {code} > > where as old (1.8.0) class was logging without calling getRuntimeContext : > > {code:java} > (...) > LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", > restoredState); > } > else { > LOG.info("No restore state for FlinkKafkaConsumer."); > }{code} > > This causes a regression in my Kafka source unit test with exception: > {code:java} > java.lang.IllegalStateException: The runtime context has not been > initialized. > at > org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) > > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) > > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) > > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > {code} > > As the context is not always available at that point (initalizeState being > called before open I guess) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
[ https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-16509: Description: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: {code:java} (...) LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else { LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } {code} where as old (1.8.0) class was logging without calling getRuntimeContext : {code:java} (...) LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else { LOG.info("No restore state for FlinkKafkaConsumer."); }{code} This causes a regression in my Kafka source unit test with exception: {code:java} java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) {code} As the context is not always available at that point (initalizeState being called before open I guess) was: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: {code:java} (...) LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else { LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } {code} where as old (1.8.0) class was logging without calling getRuntimeContext : {code:java} (...) LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else { LOG.info("No restore state for FlinkKafkaConsumer."); }{code} This causes a regression in my Kafka source unit test with exception: {code:java} java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) {code} As the context is not always available at that point (initalizeState being called before open I guess) > FlinkKafkaConsumerBase tries to log a context that may not have
[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
[ https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-16509: Description: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: {code:java} (...) LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else { LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } {code} where as old (1.8.0) class was logging without calling getRuntimeContext : {code:java} (...) LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else { LOG.info("No restore state for FlinkKafkaConsumer."); }{code} This causes a regression in my Kafka source unit test with exception: {code:java} java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) {code} As the context is not always available at that point (initalizeState being called before open I guess was: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: {{ LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); }}} where as old (1.8.0) class was logging without calling getRuntimeContext : {{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); }}} This causes a regression in my Kafka source unit test with exception: {{ java.lang.IllegalStateException: The runtime context has not been initialized.}} {{ at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}} {{ at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}} {{ As the context is not always available at that point (initalizeState being called before open I guess)}} {{ }} {{ }} > FlinkKafkaConsumerBase tries to log a context that may not have been > initialized and fails > -- > > Key: FLINK-16509 > URL: https://issues.apache.org/jira/browse/FLINK-16509 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 > Environment: Unit test on local cluster, calling a unit test local > kafka server. >Reporter: Arnaud Linz >Priority: Major > > New code of FlinkKafkaConsumerBase#initializeState(), logs restored state > with: > {code:java} > (...) > LOG.info("Consumer subtask {} restored state: {}.", > getRuntimeContext().getIndexOfThisSubtask(), restoredState); > } > else { >LOG.info("Consumer subtask {} has no restore state.", > getRuntimeContext().getIndexOfThisSubtask()); > } > {code} > > where as old (1.8.0) class was logging without calling getRuntimeContext : > > {code:java} > (...) > LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", > restoredState); > } > else { > LOG.info("No restore state for FlinkKafkaConsumer."); > }{code} > > This causes a regression in my Kafka source unit test with exception: > {code:java} > java.lang.IllegalStateException: The runtime context has
[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
[ https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-16509: Description: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: {code:java} (...) LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else { LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } {code} where as old (1.8.0) class was logging without calling getRuntimeContext : {code:java} (...) LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else { LOG.info("No restore state for FlinkKafkaConsumer."); }{code} This causes a regression in my Kafka source unit test with exception: {code:java} java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) {code} As the context is not always available at that point (initalizeState being called before open I guess) was: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: {code:java} (...) LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else { LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } {code} where as old (1.8.0) class was logging without calling getRuntimeContext : {code:java} (...) LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else { LOG.info("No restore state for FlinkKafkaConsumer."); }{code} This causes a regression in my Kafka source unit test with exception: {code:java} java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) {code} As the context is not always available at that point (initalizeState being called before open I guess > FlinkKafkaConsumerBase tries to log a context that may not have
[jira] [Updated] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
[ https://issues.apache.org/jira/browse/FLINK-16509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-16509: Description: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: {{ LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); }}} where as old (1.8.0) class was logging without calling getRuntimeContext : {{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); }}} This causes a regression in my Kafka source unit test with exception: {{ java.lang.IllegalStateException: The runtime context has not been initialized.}} {{ at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}} {{ at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}} {{ As the context is not always available at that point (initalizeState being called before open I guess)}} {{ }} {{ }} was: New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: ``` LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } ``` where as old (1.8.0) class was logging without calling getRuntimeContext : ``` LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); } ``` This causes a regression in my Kafka source unit test with exception: ``` java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) ``` As the context is not always available at that point (initalizeState being called before open I guess) > FlinkKafkaConsumerBase tries to log a context that may not have been > initialized and fails > -- > > Key: FLINK-16509 > URL: https://issues.apache.org/jira/browse/FLINK-16509 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 > Environment: Unit test on local cluster, calling a unit test local > kafka server. >Reporter: Arnaud Linz >Priority: Major > > New code of FlinkKafkaConsumerBase#initializeState(), logs restored state > with: > {{ LOG.info("Consumer subtask {} restored state: {}.", > getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ > LOG.info("Consumer subtask {} has no restore state.", > getRuntimeContext().getIndexOfThisSubtask()); }}} > > where as old (1.8.0) class was logging without calling getRuntimeContext : > > {{ LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", > restoredState); } else \{ LOG.info("No restore state for > FlinkKafkaConsumer."); }}} > > This causes a regression in my Kafka source unit test with exception: > {{ java.lang.IllegalStateException: The runtime context has not been > initialized.}} > {{ at > org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)}} > {{ at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)}} > {{ As the context is not always available at that point (initalizeState being > called before open I guess)}} > {{ }} > {{ }} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
Arnaud Linz created FLINK-16509: --- Summary: FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails Key: FLINK-16509 URL: https://issues.apache.org/jira/browse/FLINK-16509 Project: Flink Issue Type: Bug Affects Versions: 1.10.0 Environment: Unit test on local cluster, calling a unit test local kafka server. Reporter: Arnaud Linz New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with: ``` LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); } ``` where as old (1.8.0) class was logging without calling getRuntimeContext : ``` LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); } ``` This causes a regression in my Kafka source unit test with exception: ``` java.lang.IllegalStateException: The runtime context has not been initialized. at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886) ``` As the context is not always available at that point (initalizeState being called before open I guess) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz resolved FLINK-10832. - Resolution: Won't Fix Release Note: Not a flink problem, but a JDK one, solved by upgrading it. > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file
[jira] [Comment Edited] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711803#comment-16711803 ] Arnaud Linz edited comment on FLINK-10832 at 12/7/18 3:37 PM: -- Okay, it seems to be a JDK related issue. - With 1.8.0_31x64 (windows) it fails. - with 1.8.0_172-amd64 (centOS) it works. - Also tested ok with the last jdk 1.8 for windows. was (Author: arnaudl): Okay, it seems to be a JDK related issue. With 1.8.0_31x64 (windows) it fails. with 1.8.0_172-amd64 (centOS) it works. > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor >
[jira] [Comment Edited] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711803#comment-16711803 ] Arnaud Linz edited comment on FLINK-10832 at 12/6/18 6:22 PM: -- Okay, it seems to be a JDK related issue. With 1.8.0_31x64 (windows) it fails. with 1.8.0_172-amd64 (centOS) it works. was (Author: arnaudl): Which jdk are you using? > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down >
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711803#comment-16711803 ] Arnaud Linz commented on FLINK-10832: - Which jdk are you using? > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory >
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705012#comment-16705012 ] Arnaud Linz commented on FLINK-10832: - I've corrected my corrupted maven cache, now the run is fine. However, it still does not end the local cluster with 1.7.0 ... > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} >
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704554#comment-16704554 ] Arnaud Linz commented on FLINK-10832: - Okay, I've just tested with the new 1.7.0. All I did is to replace the version in the pom, but the tests fails with : Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/hash/Hashing at org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:80) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:145) at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:93) at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:669) at org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:92) at flink.flink_10832.App.testFlink10832(App.java:60) at flink.flink_10832.App.main(App.java:31) Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.hash.Hashing at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 8 more It seems that your maven package for 1.7.0 is not complete... And I'm stuck : < v 1.6.0 => use case fails in out of memory v1.6.0 => use case succeeds but yarn containeres are never killed => no go > v1.6.0 => local cluster does not end. What do you mean with " I ran the test from the JIRA once each on master/1.6.2 but it worked fine" ? You did not use any flink maven dependency but you've ran the test from the flink project itself ? Could it be a jar packaging issue ? > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). >
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694731#comment-16694731 ] Arnaud Linz commented on FLINK-10832: - Strange... I can reproduce it 100% of times. Have you tried the attached maven project ? > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager
[jira] [Comment Edited] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694517#comment-16694517 ] Arnaud Linz edited comment on FLINK-10832 at 11/21/18 10:24 AM: The main thread hangs in the MiniCluster.close() method on this future get : {{closeAsync().get();}} but the terminationFuture returned by closeAsync does not complete (but all its fields are null under the debugger) was (Author: arnaudl): The main thread hangs in the MiniCluster.close() method on this future get : {{closeAsync().get();}} > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694517#comment-16694517 ] Arnaud Linz commented on FLINK-10832: - The main thread hangs in the MiniCluster.close() method on this future get : {{closeAsync().get();}} > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591]
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694407#comment-16694407 ] Arnaud Linz commented on FLINK-10832: - Sorry for the long delay, I did not get the notification, that's not a mark of disinterest :) I've attached a minimal project that reproduce it, and the log it produces. > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. >
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Attachment: (was: pom.xml) > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory >
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Attachment: log.txt > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, log.txt > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory >
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Attachment: flink-10832.zip > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, pom.xml > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory >
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Attachment: pom.xml > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > Attachments: flink-10832.zip, pom.xml > > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} > {{[2018-11-07 11:11:23,583] INFO Shutting down > TaskExecutorLocalStateStoresManager. > (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} > {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory >
[jira] [Commented] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694375#comment-16694375 ] Arnaud Linz commented on FLINK-10832: - Sorry for the long reply, I did not get the post notification. Your question made me think that you can't reproduce this issue, so I've made a minimal project and I could not reproduce it either. So it's probably a jar conflict with one of my project dependencies. Based on this, I will conduct further investigations to identify the conflict and keep you informed. > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. >
[jira] [Issue Comment Deleted] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Comment: was deleted (was: Sorry for the long reply, I did not get the post notification. Your question made me think that you can't reproduce this issue, so I've made a minimal project and I could not reproduce it either. So it's probably a jar conflict with one of my project dependencies. Based on this, I will conduct further investigations to identify the conflict and keep you informed.) > StreamExecutionEnvironment.execute() does not return when all sources end > - > > Key: FLINK-10832 > URL: https://issues.apache.org/jira/browse/FLINK-10832 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.5.5, 1.6.2 >Reporter: Arnaud Linz >Priority: Critical > > In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), > This code never ends : > *public* *void* testFlink() *throws* Exception { > // get the execution environment > *final* StreamExecutionEnvironment env = > StreamExecutionEnvironment._getExecutionEnvironment_(); > // get input data > *final* DataStreamSource text = env.addSource(*new* > +SourceFunction()+ { > @Override > *public* *void* run(*final* SourceContext ctx) *throws* > Exception { > *for* (*int* count = 0; count < 5; count++) { > ctx.collect(String._valueOf_(count)); > } > } > @Override > *public* *void* cancel() { > } > }); > text.print().setParallelism(1); > env.execute("Simple Test"); > // Never ends ! > } > > It's critical for us as we heavily rely on this "source exhaustion stop" > mechanism to achieve proper stop of streaming applications from their own > code, so it prevents us from using the last flink versions. > > The log extract shows that the local cluster tried to shut down, but could > not do it for no apparent reason: > > {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. > Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to > RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using > default (Memory / JobManager) MemoryStateBackend (data in heap memory / > checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', > asynchronous: TRUE, maxStateSize: 5242880) > (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} > {{0}} > {{1}} > {{2}} > {{3}} > {{4}} > {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} > {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom > Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). > (org.apache.flink.runtime.taskmanager.Task:818)}} > {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed > for task Source: Custom Source -> Sink: Print to Std. Out (1/1) > (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] > (org.apache.flink.runtime.taskmanager.Task:845)}} > {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final > execution state FINISHED to JobManager for task Source: Custom Source -> > Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. > (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} > {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to > Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to > FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} > {{[2018-11-07 11:11:13,907] INFO Job Simple Test > (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. > (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} > {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job > 0ef8697ca98f6d2b565ed928d17c8a49. > (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} > {{[2018-11-07 11:11:13,908] INFO Shutting down > (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} > {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster > (org.apache.flink.runtime.minicluster.MiniCluster:427)}} > {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. > (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} > {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor > akka://flink/user/taskmanager_0. >
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Description: In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), This code never ends : {{ public void testFlink162() throws Exception { // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data final DataStreamSource text = env.addSource(new SourceFunction() { @Override public void run(final SourceContext ctx) throws Exception { for (int count = 0; count < 5; count++) { ctx.collect(String.valueOf(count)); } } @Override public void cancel() { } }); text.print().setParallelism(1); env.execute("Simple Test"); // Never ends ! } }} It's critical for us as we heavily rely on this "source exhaustion stop" mechanism to achieve proper stop of streaming applications from their own code, so it prevents us from using the last flink versions. The log extract shows that the local cluster tried to shut down, but could not do it for no apparent reason: {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} {{0}} {{1}} {{2}} {{3}} {{4}} {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)}} {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)}} {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} {{[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)}} {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} {{[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}} {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}} {{[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}} {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}} {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}} {{[2018-11-07
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Description: In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), This code never ends : {{ public void testFlink162() throws Exception {}} {{ // get the execution environment}} {{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}} {{ // get input data }} {{ final DataStreamSource text = env.addSource(new SourceFunction() {}} {{ @Override}} {{ public void run(final SourceContext ctx) throws Exception {}} {{ for (int count = 0; count < 5; count++) {}} {{ ctx.collect(String.valueOf(count));}} {{ }}} {{ }}} {{ @Override}} {{ public void cancel() {}} {{ }}} {{ });}} {{ text.print().setParallelism(1);}} {{ env.execute("Simple Test");}} {{ // Never ends !}} {{ }}} It's critical for us as we heavily rely on this "source exhaustion stop" mechanism to achieve proper stop of streaming applications from their own code, so it prevents us from using the last flink versions. The log extract shows that the local cluster tried to shut down, but could not do it for no apparent reason: {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} {{0}} {{1}} {{2}} {{3}} {{4}} {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)}} {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)}} {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} {{[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)}} {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} {{[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}} {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}} {{[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}} {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}} {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}} {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection cd021102669258aad77c20645ed08aae: ResourceManager leader
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Description: In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), This code never ends : {{ public void testFlink162() throws Exception {}} {{ // get the execution environment}} {{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}} {{ // get input data }} {{ final DataStreamSource text = env.addSource(new SourceFunction() {}} {{ @Override}} {{ public void run(final SourceContext ctx) throws Exception {}} {{ for (int count = 0; count < 5; count++)}}{{{ ctx.collect(String.valueOf(count)); }}}{{}}} {{ @Override}} {{ public void cancel() {}} {{ }}} {{ });}} {{ text.print().setParallelism(1);}} {{ env.execute("Simple Test");}} {{ // Never ends !}} {{ }}} {{ }} It's critical for us as we heavily rely on this "source exhaustion stop" mechanism to achieve proper stop of streaming applications from their own code, so it prevents us from using the last flink versions. The log extract shows that the local cluster tried to shut down, but could not do it for no apparent reason: {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} {{0}} {{1}} {{2}} {{3}} {{4}} {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)}} {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)}} {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} {{[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)}} {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} {{[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}} {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}} {{[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}} {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}} {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}} {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection cd021102669258aad77c20645ed08aae: ResourceManager leader
[jira] [Created] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
Arnaud Linz created FLINK-10832: --- Summary: StreamExecutionEnvironment.execute() does not return when all sources end Key: FLINK-10832 URL: https://issues.apache.org/jira/browse/FLINK-10832 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.6.2, 1.5.5 Reporter: Arnaud Linz In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), This code never ends : {{ public void testFlink() throws Exception {}} {{ // get the execution environment}} {{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}} {{ // get input data}} {{final DataStreamSource text = env.addSource(new SourceFunction() {}} {{ @Override}} {{ public void run(final SourceContext ctx) throws Exception {}} {{ for (int count = 0; count < 5; count++) {}} {{ ctx.collect(String.valueOf(count));}} {{ }}} {{ }}} {{ @Override}} {{ public void cancel() {}} {{ }}} {{ });}} {{ text.print().setParallelism(1);}} {{ env.execute("Simple Test");}} {{ // Never ends !}} {{ }}}{{ }} It's critical for us as we heavily rely on this "source exhaustion stop" mechanism to achieve proper stop of streaming applications from their own code, so it prevents us from using the last flink versions. The log extract shows that the local cluster tried to shut down, but could not do it for no apparent reason: {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} {{0}} {{1}} {{2}} {{3}} {{4}} {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)}} {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)}} {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} {{[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)}} {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} {{[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}} {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}} {{[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}} {{[2018-11-07 11:11:23,593]
[jira] [Updated] (FLINK-10832) StreamExecutionEnvironment.execute() does not return when all sources end
[ https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-10832: Description: In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), This code never ends : *public* *void* testFlink() *throws* Exception { // get the execution environment *final* StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_(); // get input data *final* DataStreamSource text = env.addSource(*new* +SourceFunction()+ { @Override *public* *void* run(*final* SourceContext ctx) *throws* Exception { *for* (*int* count = 0; count < 5; count++) { ctx.collect(String._valueOf_(count)); } } @Override *public* *void* cancel() { } }); text.print().setParallelism(1); env.execute("Simple Test"); // Never ends ! } It's critical for us as we heavily rely on this "source exhaustion stop" mechanism to achieve proper stop of streaming applications from their own code, so it prevents us from using the last flink versions. The log extract shows that the local cluster tried to shut down, but could not do it for no apparent reason: {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}} {{0}} {{1}} {{2}} {{3}} {{4}} {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}} {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)}} {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)}} {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}} {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}} {{[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}} {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}} {{[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}} {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)}} {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}} {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}} {{[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}} {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}} {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}} {{[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}} {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}} {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager.
[jira] [Updated] (FLINK-8580) No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job
[ https://issues.apache.org/jira/browse/FLINK-8580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-8580: --- Description: Hello, I am using Flink 1.3.2 and I’m struggling to achieve something that should be simple. For isolation reasons, I want to start multiple long living yarn session containers (with the same user) and choose at run-time, when I start a HA streaming app, which container will hold it. I start my yarn session with the command line option : -Dyarn.properties-file.location=mydir The session is created and a .yarn-properties-$USER file is generated. And I’ve tried the following to submit my job: *CASE 1* *flink-conf.yaml* : yarn.properties-file.location: mydir *flink run options* : none * Uses zookeeper and works – but I cannot choose the container as the property file is global. *CASE 2* *flink-conf.yaml* : nothing *flink run options* : -yid applicationId * Do not use zookeeper, tries to connect to yarn job manager but fails in “Job submission to the JobManager timed out” error *CASE 3* *flink-conf.yaml* : nothing *flink run options* : -yid applicationId and -yD with all dynamic properties found in the “dynamicPropertiesString” of .yarn-properties $USER file * Same as case 2 *CASE 4* *flink-conf.yaml* : nothing *flink run options* : -yD yarn.properties-file.location=mydir * Tries to connect to local (non yarn) job manager (and fails) *CASE 5* Even weirder: *flink-conf.yaml* : yarn.properties-file.location: mydir *flink run options* : -yD yarn.properties-file.location=mydir * Still tries to connect to local (non yarn) job manager! Without any other solution, I've made a shell script that copies the original content of FLINK_CONF_DIR in a temporary dir, modify flink-conf.yaml to set yarn.properties-file.location, and change FLINK_CONF_DIR to that temp dir before executing flink to submit the job. I am now able to select the container I want, but I think it should be made simpler… Logs extracts: *CASE 1:* {{2018:02:01 15:43:20 - Waiting until all TaskManagers have connected}}{{2018:02:01 15:43:20 - Starting client actor system.}}{{2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:20 - Trying to select the network interface and address to use by connecting to the leading JobManager.}}{{2018:02:01 15:43:20 - TaskManager will try to connect for 1 milliseconds before falling back to heuristics}}{{2018:02:01 15:43:21 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Slf4jLogger started}}{{2018:02:01 15:43:21 - Starting remoting}}{{2018:02:01 15:43:21 - Remoting started; listening on addresses :[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]}}{{2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - TaskManager status (2/1)}}{{2018:02:01 15:43:21 - All TaskManagers are connected}}{{2018:02:01 15:43:21 - Submitting job with JobID: f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.}}{{2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager yet.}}{{2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).}}{{2018:02:01 15:43:21 - Disconnect from JobManager null.}}{{2018:02:01 15:43:21 - Connect to JobManager Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01 15:43:21 - Connected to JobManager at Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.}}{{2018:02:01 15:43:21 - Sending message to JobManager akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for progress}}{{2018:02:01 15:43:21 - Upload jar files to job manager akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01 15:43:21 - Blob client connecting to akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager}}{{2018:02:01 15:43:22 - Submit job to the job manager akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01 15:43:22 - Job f69197b0b80a76319a87bde10c1e3f77 was successfully submitted to the JobManager akka://flink/deadLetters.}}{{2018:02:01 15:43:22 - 02/01/2018 15:43:22 Job execution switched to status RUNNING.}} *CASE 2:* {{2018:02:01 15:48:43 - Waiting until all TaskManagers have
[jira] [Created] (FLINK-8580) No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job
Arnaud Linz created FLINK-8580: -- Summary: No easy way (or issues when trying?) to handle multiple yarn sessions and choose at runtime the one to submit a ha streaming job Key: FLINK-8580 URL: https://issues.apache.org/jira/browse/FLINK-8580 Project: Flink Issue Type: Improvement Components: Job-Submission Affects Versions: 1.3.2 Reporter: Arnaud Linz Hello, I am using Flink 1.3.2 and I’m struggling to achieve something that should be simple. For isolation reasons, I want to start multiple long living yarn session containers (with the same user) and choose at run-time, when I start a HA streaming app, which container will hold it. I start my yarn session with the command line option : -Dyarn.properties-file.location=mydir The session is created and a .yarn-properties-$USER file is generated. And I’ve tried the following to submit my job: *CASE 1* *flink-conf.yaml* : yarn.properties-file.location: mydir *flink run options* : none * Uses zookeeper and works – but I cannot choose the container as the property file is global. ** *CASE 2* *flink-conf.yaml* : nothing *flink run options* : -yid applicationId * Do not use zookeeper, tries to connect to yarn job manager but fails in “Job submission to the JobManager timed out” error ** *CASE 3* *flink-conf.yaml* : nothing *flink run options* : -yid applicationId and -yD with all dynamic properties found in the “dynamicPropertiesString” of .yarn-properties-$USER file * Same as case 2 ** *CASE 4* *flink-conf.yaml* : nothing *flink run options* : -yD yarn.properties-file.location=mydir * Tries to connect to local (non yarn) job manager (and fails) ** *CASE 5* Even weirder: *flink-conf.yaml* : yarn.properties-file.location: mydir *flink run options* : -yD yarn.properties-file.location=mydir * Still tries to connect to local (non yarn) job manager! Without any other solution, I've made a shell script that copies the original content of FLINK_CONF_DIR in a temporary dir, modify flink-conf.yaml to set yarn.properties-file.location, and change FLINK_CONF_DIR to that temp dir before executing flink to submit the job. I am now able to select the container I want, but I think it should be made simpler… Logs extracts: *CASE 1:* {{2018:02:01 15:43:20 - Waiting until all TaskManagers have connected}}{{2018:02:01 15:43:20 - Starting client actor system.}}{{2018:02:01 15:43:20 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:20 - Trying to select the network interface and address to use by connecting to the leading JobManager.}}{{2018:02:01 15:43:20 - TaskManager will try to connect for 1 milliseconds before falling back to heuristics}}{{2018:02:01 15:43:21 - Retrieved new target address elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr/10.136.170.193:33970.}}{{2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Slf4jLogger started}}{{2018:02:01 15:43:21 - Starting remoting}}{{2018:02:01 15:43:21 - Remoting started; listening on addresses :[akka.tcp://fl...@elara-edge-u2-n01.dev.mlb.jupiter.nbyt.fr:36340]}}{{2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Stopping ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - TaskManager status (2/1)}}{{2018:02:01 15:43:21 - All TaskManagers are connected}}{{2018:02:01 15:43:21 - Submitting job with JobID: f69197b0b80a76319a87bde10c1e3f77. Waiting for job completion.}}{{2018:02:01 15:43:21 - Starting ZooKeeperLeaderRetrievalService.}}{{2018:02:01 15:43:21 - Received SubmitJobAndWait(JobGraph(jobId: f69197b0b80a76319a87bde10c1e3f77)) but there is no connection to a JobManager yet.}}{{2018:02:01 15:43:21 - Received job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77).}}{{2018:02:01 15:43:21 - Disconnect from JobManager null.}}{{2018:02:01 15:43:21 - Connect to JobManager Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245].}}{{2018:02:01 15:43:21 - Connected to JobManager at Actor[akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager#-1554418245] with leader session id 388af5b8--4923-8ee4-8a4b9bfbb0b9.}}{{2018:02:01 15:43:21 - Sending message to JobManager akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager to submit job SND-IMP-SIGNAST (f69197b0b80a76319a87bde10c1e3f77) and wait for progress}}{{2018:02:01 15:43:21 - Upload jar files to job manager akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01 15:43:21 - Blob client connecting to akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager}}{{2018:02:01 15:43:22 - Submit job to the job manager akka.tcp://fl...@elara-data-u2-n01.dev.mlb.jupiter.nbyt.fr:33970/user/jobmanager.}}{{2018:02:01 15:43:22
[jira] [Closed] (FLINK-6289) ExecutionEnvironment.readTextFile() can read gzip files & directories but not both
[ https://issues.apache.org/jira/browse/FLINK-6289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz closed FLINK-6289. -- Resolution: Invalid Fix Version/s: 1.1.3 Actually, it works... It seems I was to quick to report a problem I cannot reproduce any longer, sorry... > ExecutionEnvironment.readTextFile() can read gzip files & directories but not > both > -- > > Key: FLINK-6289 > URL: https://issues.apache.org/jira/browse/FLINK-6289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.3 >Reporter: Arnaud Linz >Priority: Minor > Fix For: 1.1.3 > > > When calling `ExecutionEnvironment.readTextFile()` passing a ".gz" file as > input, it deflates the file correctly. If I pass a directory as input, it > reads the files contained in the directory. But If I pass a directory > containing ".gz" files as input, it does not deflate the files and treat them > as ASCII files. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6289) ExecutionEnvironment.readTextFile() can read gzip files & directories but not both
Arnaud Linz created FLINK-6289: -- Summary: ExecutionEnvironment.readTextFile() can read gzip files & directories but not both Key: FLINK-6289 URL: https://issues.apache.org/jira/browse/FLINK-6289 Project: Flink Issue Type: Bug Affects Versions: 1.1.3 Reporter: Arnaud Linz Priority: Minor When calling `ExecutionEnvironment.readTextFile()` passing a ".gz" file as input, it deflates the file correctly. If I pass a directory as input, it reads the files contained in the directory. But If I pass a directory containing ".gz" files as input, it does not deflate the files and treat them as ASCII files. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-5305) Documentation of collect() should mention parameter akka.framesize
Arnaud Linz created FLINK-5305: -- Summary: Documentation of collect() should mention parameter akka.framesize Key: FLINK-5305 URL: https://issues.apache.org/jira/browse/FLINK-5305 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.3 Reporter: Arnaud Linz Priority: Minor When calling collect() on a big dataset, if the data size exceeds the value of akka.framesize, which is 10Mb by default, the collect() freezes and some "akka payload size exceeded" logs appear in the yarn logs (but neither in the flink console output nor UI logs). 10Mb is not that big, and its relation to akka.framesize is hard to find in the documentation (in Flink internals section). I believe that the "easy to find" collect documentation should mention this parameter to save hours of users time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4079) YARN properties file used for per-job cluster
[ https://issues.apache.org/jira/browse/FLINK-4079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341327#comment-15341327 ] Arnaud Linz commented on FLINK-4079: Personally, I'd like to since it is a blocking problem on one of my environment. > YARN properties file used for per-job cluster > - > > Key: FLINK-4079 > URL: https://issues.apache.org/jira/browse/FLINK-4079 > Project: Flink > Issue Type: Bug > Components: Client >Affects Versions: 1.0.3 >Reporter: Ufuk Celebi >Assignee: Maximilian Michels > Fix For: 1.1.0, 1.0.4 > > > YARN per job clusters (flink run -m yarn-cluster) rely on the hidden YARN > properties file, which defines the container configuration. This can lead to > unexpected behaviour, because the per-job-cluster configuration is merged > with the YARN properties file (or used as only configuration source). > A user ran into this as follows: > - Create a long-lived YARN session with HA (creates a hidden YARN properties > file) > - Submits standalone batch jobs with a per job cluster (flink run -m > yarn-cluster). The batch jobs get submitted to the long lived HA cluster, > because of the properties file. > [~mxm] Am I correct in assuming that this is only relevant for the 1.0 branch > and will be fixed with the client refactoring you are working on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324308#comment-15324308 ] Arnaud Linz commented on FLINK-3763: No, but I do remember having lost patience on my early usage of the RMQ source upon cancellation. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > Fix For: 1.1.0 > > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322517#comment-15322517 ] Arnaud Linz commented on FLINK-3763: This comment is probably too late, but I have a problem with : while (running) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); and public void cancel() { running = false; } If no element ever comes in the queue, nextDelivery may wait forever as no timeout is provided, so the while(running) is never executed, and the cancel() method does nothing. It prevents us from properly cancelling such sources, forcing the user to kill the yarn application to be able to bypass the forever "cancelling" state of the flink workflow. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > Fix For: 1.1.0 > > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306684#comment-15306684 ] Arnaud Linz commented on FLINK-3763: IMO, this source lacks many RMQ features in order to make it usable in production. VirtualHost is one of them ; but I think that the whole RabbitMQ factory should be made available to the user. Currently it is not even accessible from an inherited class. It would also be a good idea to make this source truly cancellable by adding something like {{consumer.getChannel().abort(AMQP.REPLY_SUCCESS, "Cancellation requested by user");}} in the {{cancel()}} method or else it may never end in a clean way. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-3763: --- Comment: was deleted (was: IMO, this source lacks many RMQ features in order to make it usable in production. VirtualHost is one of them ; but shutdown timeout is also important, or else the the stream may fail after 10 minutes of inactivity => the whole RabbitMQ factory should be made available to the user. Currently it is not even accessible from an inherited class. It would also be a good idea to make this source truly cancellable by adding something like {{consumer.getChannel().abort(AMQP.REPLY_SUCCESS, "Cancellation requested by user");}} in the {{cancel()}} method or else it may never end in a clean way. ) > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306678#comment-15306678 ] Arnaud Linz commented on FLINK-3763: IMO, this source lacks many RMQ features in order to make it usable in production. VirtualHost is one of them ; but shutdown timeout is also important, or else the the stream may fail after 10 minutes of inactivity => the whole RabbitMQ factory should be made available to the user. Currently it is not even accessible from an inherited class. It would also be a good idea to make this source truly cancellable by adding something like {{consumer.getChannel().abort(AMQP.REPLY_SUCCESS, "Cancellation requested by user");}} in the {{cancel()}} method or else it may never end in a clean way. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table
[ https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14740440#comment-14740440 ] Arnaud Linz commented on FLINK-2617: Well, I've searched apache snapshot repository for 0.9-SNAPSHOT and I've looked at http://stratosphere-bin.s3.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz. I was trying to avoid building it from the sources until I can get the time to configure my env. > ConcurrentModificationException when using HCatRecordReader to access a hive > table > -- > > Key: FLINK-2617 > URL: https://issues.apache.org/jira/browse/FLINK-2617 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Reporter: Arnaud Linz >Assignee: Fabian Hueske >Priority: Critical > Fix For: 0.9, 0.10 > > > I don't know if it's a hcat or a flink problem, but when reading a hive table > in a cluster with many slots (20 threads per container), I systematically run > into a {{ConcurrentModificationException}} in a copy method of a > {{Configuration}} object that change during the copy. > From what I understand, this object comes from > {{TaskAttemptContext.getConfiguration()}} created by > {{HadoopUtils.instantiateTaskAttemptContext(configuration, new > TaskAttemptID());}} > Maybe the {{job.Configuration}} object passed to the constructor of > {{HadoopInputFormatBase}} should be cloned somewhere? > Stack trace is : > {code} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > org.apache.flink.client.program.Client.run(Client.java:413) > org.apache.flink.client.program.Client.run(Client.java:356) > org.apache.flink.client.program.Client.run(Client.java:349) > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:606) > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > org.apache.flink.client.program.Client.run(Client.java:315) > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > akka.actor.ActorCell.invoke(ActorCell.scala:487) > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > akka.dispatch.Mailbox.run(Mailbox.scala:221) > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >
[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table
[ https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14740386#comment-14740386 ] Arnaud Linz commented on FLINK-2617: Hi, today (11-09) I've tested the 0.10 nightly build with success, thank you very much. However, I did not find the patch in the 0.9 snapshot ; is it normal? > ConcurrentModificationException when using HCatRecordReader to access a hive > table > -- > > Key: FLINK-2617 > URL: https://issues.apache.org/jira/browse/FLINK-2617 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Reporter: Arnaud Linz >Assignee: Fabian Hueske >Priority: Critical > Fix For: 0.9, 0.10 > > > I don't know if it's a hcat or a flink problem, but when reading a hive table > in a cluster with many slots (20 threads per container), I systematically run > into a {{ConcurrentModificationException}} in a copy method of a > {{Configuration}} object that change during the copy. > From what I understand, this object comes from > {{TaskAttemptContext.getConfiguration()}} created by > {{HadoopUtils.instantiateTaskAttemptContext(configuration, new > TaskAttemptID());}} > Maybe the {{job.Configuration}} object passed to the constructor of > {{HadoopInputFormatBase}} should be cloned somewhere? > Stack trace is : > {code} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > org.apache.flink.client.program.Client.run(Client.java:413) > org.apache.flink.client.program.Client.run(Client.java:356) > org.apache.flink.client.program.Client.run(Client.java:349) > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:606) > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > org.apache.flink.client.program.Client.run(Client.java:315) > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > akka.actor.ActorCell.invoke(ActorCell.scala:487) > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > akka.dispatch.Mailbox.run(Mailbox.scala:221) > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.ConcurrentModificationException >
[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table
[ https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736262#comment-14736262 ] Arnaud Linz commented on FLINK-2617: hive-hcatalog-core-0.14.0.jar > ConcurrentModificationException when using HCatRecordReader to access a hive > table > -- > > Key: FLINK-2617 > URL: https://issues.apache.org/jira/browse/FLINK-2617 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Reporter: Arnaud Linz >Priority: Critical > > I don't know if it's a hcat or a flink problem, but when reading a hive table > in a cluster with many slots (20 threads per container), I systematically run > into a {{ConcurrentModificationException}} in a copy method of a > {{Configuration}} object that change during the copy. > From what I understand, this object comes from > {{TaskAttemptContext.getConfiguration()}} created by > {{HadoopUtils.instantiateTaskAttemptContext(configuration, new > TaskAttemptID());}} > Maybe the {{job.Configuration}} object passed to the constructor of > {{HadoopInputFormatBase}} should be cloned somewhere? > Stack trace is : > {code} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > org.apache.flink.client.program.Client.run(Client.java:413) > org.apache.flink.client.program.Client.run(Client.java:356) > org.apache.flink.client.program.Client.run(Client.java:349) > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:606) > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > org.apache.flink.client.program.Client.run(Client.java:315) > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > akka.actor.ActorCell.invoke(ActorCell.scala:487) > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > akka.dispatch.Mailbox.run(Mailbox.scala:221) > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.ConcurrentModificationException > java.util.HashMap$HashIterator.nextEntry(HashMap.java:926) > java.util.HashMap$KeyIterator.next(HashMap.java:960) > java.util.AbstractCollection.addAll(AbstractCollection.java:341) > java.util.HashSet.(HashSet.java:117) >
[jira] [Commented] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table
[ https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14729289#comment-14729289 ] Arnaud Linz commented on FLINK-2617: (I think that the subject is in fact critical because it happens really often in my cluster as soon as parallelism is > 250) > ConcurrentModificationException when using HCatRecordReader to access a hive > table > -- > > Key: FLINK-2617 > URL: https://issues.apache.org/jira/browse/FLINK-2617 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Reporter: Arnaud Linz >Priority: Critical > > I don't know if it's a hcat or a flink problem, but when reading a hive table > in a cluster with many slots (20 threads per container), I systematically run > into a {{ConcurrentModificationException}} in a copy method of a > {{Configuration}} object that change during the copy. > From what I understand, this object comes from > {{TaskAttemptContext.getConfiguration()}} created by > {{HadoopUtils.instantiateTaskAttemptContext(configuration, new > TaskAttemptID());}} > Maybe the {{job.Configuration}} object passed to the constructor of > {{HadoopInputFormatBase}} should be cloned somewhere? > Stack trace is : > {code} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > org.apache.flink.client.program.Client.run(Client.java:413) > org.apache.flink.client.program.Client.run(Client.java:356) > org.apache.flink.client.program.Client.run(Client.java:349) > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:606) > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > org.apache.flink.client.program.Client.run(Client.java:315) > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > akka.actor.ActorCell.invoke(ActorCell.scala:487) > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > akka.dispatch.Mailbox.run(Mailbox.scala:221) > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.ConcurrentModificationException > java.util.HashMap$HashIterator.nextEntry(HashMap.java:926) > java.util.HashMap$KeyIterator.next(HashMap.java:960) >
[jira] [Updated] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table
[ https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arnaud Linz updated FLINK-2617: --- Priority: Critical (was: Major) > ConcurrentModificationException when using HCatRecordReader to access a hive > table > -- > > Key: FLINK-2617 > URL: https://issues.apache.org/jira/browse/FLINK-2617 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility >Reporter: Arnaud Linz >Priority: Critical > > I don't know if it's a hcat or a flink problem, but when reading a hive table > in a cluster with many slots (20 threads per container), I systematically run > into a {{ConcurrentModificationException}} in a copy method of a > {{Configuration}} object that change during the copy. > From what I understand, this object comes from > {{TaskAttemptContext.getConfiguration()}} created by > {{HadoopUtils.instantiateTaskAttemptContext(configuration, new > TaskAttemptID());}} > Maybe the {{job.Configuration}} object passed to the constructor of > {{HadoopInputFormatBase}} should be cloned somewhere? > Stack trace is : > {code} > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > org.apache.flink.client.program.Client.run(Client.java:413) > org.apache.flink.client.program.Client.run(Client.java:356) > org.apache.flink.client.program.Client.run(Client.java:349) > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.lang.reflect.Method.invoke(Method.java:606) > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > org.apache.flink.client.program.Client.run(Client.java:315) > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) > org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) > org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > akka.actor.ActorCell.invoke(ActorCell.scala:487) > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > akka.dispatch.Mailbox.run(Mailbox.scala:221) > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.util.ConcurrentModificationException > java.util.HashMap$HashIterator.nextEntry(HashMap.java:926) > java.util.HashMap$KeyIterator.next(HashMap.java:960) > java.util.AbstractCollection.addAll(AbstractCollection.java:341) > java.util.HashSet.(HashSet.java:117) >
[jira] [Created] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table
Arnaud Linz created FLINK-2617: -- Summary: ConcurrentModificationException when using HCatRecordReader to access a hive table Key: FLINK-2617 URL: https://issues.apache.org/jira/browse/FLINK-2617 Project: Flink Issue Type: Bug Components: Hadoop Compatibility Reporter: Arnaud Linz I don't know if it's a hcat or a flink problem, but when reading a hive table in a cluster with many slots (20 threads per container), I systematically run into a {{ConcurrentModificationException}} in a copy method of a {{Configuration}} object that change during the copy. >From what I understand, this object comes from >{{TaskAttemptContext.getConfiguration()}} created by >{{HadoopUtils.instantiateTaskAttemptContext(configuration, new >TaskAttemptID());}} Maybe the {{job.Configuration}} object passed to the constructor of {{HadoopInputFormatBase}} should be cloned somewhere? Stack trace is : {code} org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. org.apache.flink.client.program.Client.run(Client.java:413) org.apache.flink.client.program.Client.run(Client.java:356) org.apache.flink.client.program.Client.run(Client.java:349) org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73) com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69) com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50) com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88) com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) org.apache.flink.client.program.Client.run(Client.java:315) org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582) org.apache.flink.client.CliFrontend.run(CliFrontend.java:288) org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878) org.apache.flink.client.CliFrontend.main(CliFrontend.java:920) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) akka.actor.Actor$class.aroundReceive(Actor.scala:465) org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) akka.actor.ActorCell.invoke(ActorCell.scala:487) akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) akka.dispatch.Mailbox.run(Mailbox.scala:221) akka.dispatch.Mailbox.exec(Mailbox.scala:231) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.util.ConcurrentModificationException java.util.HashMap$HashIterator.nextEntry(HashMap.java:926) java.util.HashMap$KeyIterator.next(HashMap.java:960) java.util.AbstractCollection.addAll(AbstractCollection.java:341) java.util.HashSet.(HashSet.java:117) org.apache.hadoop.conf.Configuration.(Configuration.java:554) org.apache.hadoop.mapred.JobConf.(JobConf.java:439) org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637) org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112) org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)
[jira] [Created] (FLINK-2589) Threads created in TimeTriggerPolicy don't end properly
Arnaud Linz created FLINK-2589: -- Summary: Threads created in TimeTriggerPolicy don't end properly Key: FLINK-2589 URL: https://issues.apache.org/jira/browse/FLINK-2589 Project: Flink Issue Type: Bug Components: Streaming Reporter: Arnaud Linz Priority: Minor TimeTriggerPolicy uses a thread (TimeCheck) to push fake events in case of time out. However, this threads implements a infinite loop and ignore InterruptExceptions : it never ends properly. Once created, it continues to push fake events even if the execution is over, polluting the standard error with stacktraces because the fake element post fails. This especially occurs in unit tests using local clusters, because the JVM does not end. Stack trace extract : java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:291) at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:276) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30) at org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:44) at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:57) at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:62) at org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:47) at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288) ... 9 more Caused by: java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:291) at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:276) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:37) at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288) ... 17 more Caused by: java.lang.RuntimeException: Buffer pool is destroyed. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:37) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:50) at org.apache.flink.streaming.runtime.io.CollectorWrapper.collect(CollectorWrapper.java:30) at org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:43) at org.apache.flink.streaming.runtime.tasks.OutputHandler$ChainingOutput.collect(OutputHandler.java:288) ... 22 more Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) ... 27 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2579) StreamExecutionEnvironment ExecutionEnvironment do not share interface but have a lot in common
Arnaud Linz created FLINK-2579: -- Summary: StreamExecutionEnvironment ExecutionEnvironment do not share interface but have a lot in common Key: FLINK-2579 URL: https://issues.apache.org/jira/browse/FLINK-2579 Project: Flink Issue Type: Improvement Components: Core Reporter: Arnaud Linz Priority: Minor Both classes org.apache.flink.streaming.api.environment.StreamExecutionEnvironment and org.apache.flink.api.java.ExecutionEnvironment have a lot in common (same methods for kryo registration, fromCollection, etc) but are not related by a java contract. That leads to annoying differences, for instance : StreamExecutionEnvironment.setParallelism() returns 'this' where as ExecutionEnvironment.setParallelism() has not return value. They have specificities, but maybe they should both implement a common Interface to make sure that the common signatures are coherent? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2580) HadoopDataOutputStream does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream
Arnaud Linz created FLINK-2580: -- Summary: HadoopDataOutputStream does not expose enough methods of org.apache.hadoop.fs.FSDataOutputStream Key: FLINK-2580 URL: https://issues.apache.org/jira/browse/FLINK-2580 Project: Flink Issue Type: Improvement Components: Hadoop Compatibility Reporter: Arnaud Linz Priority: Minor I’ve noticed that when you use org.apache.flink.core.fs.FileSystem to write into a hdfs file, calling org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(), it returns a HadoopDataOutputStream that wraps a org.apache.hadoop.fs.FSDataOutputStream (under its org.apache.hadoop.hdfs.client .HdfsDataOutputStream wrappper). However, FSDataOutputStream exposes many methods like flush, getPos etc, but HadoopDataOutputStream only wraps write close. For instance, flush() calls the default, empty implementation of OutputStream instead of the hadoop one, and that’s confusing. Moreover, because of the restrictive OutputStream interface, hsync() and hflush() are not exposed to Flink. I see two options: - complete the class to wrap all methods of OutputStream and add a getWrappedStream() to access other stuff like hsync(). - get rid of the Hadoop wrapping and directly use Hadoop file system objects. -- This message was sent by Atlassian JIRA (v6.3.4#6332)