Re: Question about Infinite Streaming Job on Mini Cluster and ITCase
Hey Matthias, Thanks for reporting the Exception thrown, we were not preparing for this use case yet. We fixed it with Gyula, he is pushing a fix for it right now: When the job is cancelled (for example due to shutting down the executor underneath) you should not see that InterruptedException as soon as this commit is in. [1] As for getting the streaming JobExecutionResult back from a detached job my current best practice is what you can see in the ProcessFailureRecoveryTestBase and its streaming implementation: starting an executor in a separate thread and then joining it with the main one. Would you prefer a more Storm example-ish solution? [2] [1] https://github.com/mbalassi/flink/commit/5db06d6d [2] https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi Robert, thanks for your answer. I get an InterruptedException when I call shutdown(): java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1225) at java.lang.Thread.join(Thread.java:1278) at org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) at org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) at org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:701) About the JobExecutionResult: I added a new method to the API, that calls JobClient.submitJobDetached(...) instead of JobClient.submitJobAndWait(...). The detached version has no return value, while the blocking one returns a JobExecutionResult that is further returned by execute(). So I cannot get a JobExecutionResult right now. It would be nice to get the JobExecutionResult when stopping the running program via a stop-execution-call (is there any way to do this?). Right now, I sleep for a certain time after calling submitJobDetached(...) an call stop() and shutdown() later on (from ForkableMiniCluster). The stop() call does not seem to do anything... shutdown() works (except for the Exception I get -- as described above). -Matthias On 03/30/2015 09:08 PM, Robert Metzger wrote: Hi Matthias, the streaming folks can probably answer the questions better. But I'll write something to bring this message back to their attention ;) 1) Which exceptions are you seeing? Flink should be able to cleanly shut down. 2) As far as I saw it, the execute() method (of the Streaming API) got an JobExecutionResult return type in the latest master. That contains accumulator results. 3) I think the cancel() method is there for exactly that purpose. If the job is shutting down before the cancel method, that probably a bug. Robert On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi, I am trying to run an infinite streaming job (ie, one that does not terminate because it is generating output date randomly on the fly). I kill this job with .stop() or .shutdown() method of ForkableFlinkMiniCluster. I did not find any example using a similar setup. In the provided examples, each job terminate automatically, because only a finite input is processed and the source returns after all data is emitted. I have multiple question about my setup: 1) The job never terminates clean, ie, I get some exceptions. Is this behavior desired? 2) Is it possible to get a result back? Similar to JobClient.submitJobAndWait(...)? 3) Is it somehow possible, to send a signal to the running job such that the source can terminate regularly as if finite input would be processed? Right now, I use an while(running) loop and set 'running' to false in the .cancel() method. Thanks for your help! -Matthias
Re: Question about Infinite Streaming Job on Mini Cluster and ITCase
As a followup - I think it would be a good thing to add a way to gracefully stop a streaming job. Something that sends close to the sources, and they quit. We can use this for graceful shutdown wen re-partitioninig / scaling in or out, ... On Wed, Apr 1, 2015 at 1:29 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi, I will pull the fix and try it out. Thanks for the hint with the extra Thread. That should work for me. But you are actually right; my setup is Storm inspired. I thinks its a very natural way to deploy and stop and infinite streaming job. Maybe, you want to adopt to it. The ITCase I am writing bases on StreamingProgramTestBase, so I need the JobExecutionResult because the test fails without it. -Matthias On 04/01/2015 11:09 AM, Márton Balassi wrote: Hey Matthias, Thanks for reporting the Exception thrown, we were not preparing for this use case yet. We fixed it with Gyula, he is pushing a fix for it right now: When the job is cancelled (for example due to shutting down the executor underneath) you should not see that InterruptedException as soon as this commit is in. [1] As for getting the streaming JobExecutionResult back from a detached job my current best practice is what you can see in the ProcessFailureRecoveryTestBase and its streaming implementation: starting an executor in a separate thread and then joining it with the main one. Would you prefer a more Storm example-ish solution? [2] [1] https://github.com/mbalassi/flink/commit/5db06d6d [2] https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java#L99-104 On Tue, Mar 31, 2015 at 2:54 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi Robert, thanks for your answer. I get an InterruptedException when I call shutdown(): java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1225) at java.lang.Thread.join(Thread.java:1278) at org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) at org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) at org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:701) About the JobExecutionResult: I added a new method to the API, that calls JobClient.submitJobDetached(...) instead of JobClient.submitJobAndWait(...). The detached version has no return value, while the blocking one returns a JobExecutionResult that is further returned by execute(). So I cannot get a JobExecutionResult right now. It would be nice to get the JobExecutionResult when stopping the running program via a stop-execution-call (is there any way to do this?). Right now, I sleep for a certain time after calling submitJobDetached(...) an call stop() and shutdown() later on (from ForkableMiniCluster). The stop() call does not seem to do anything... shutdown() works (except for the Exception I get -- as described above). -Matthias On 03/30/2015 09:08 PM, Robert Metzger wrote: Hi Matthias, the streaming folks can probably answer the questions better. But I'll write something to bring this message back to their attention ;) 1) Which exceptions are you seeing? Flink should be able to cleanly shut down. 2) As far as I saw it, the execute() method (of the Streaming API) got an JobExecutionResult return type in the latest master. That contains accumulator results. 3) I think the cancel() method is there for exactly that purpose. If the job is shutting down before the cancel method, that probably a bug. Robert On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi, I am trying to run an infinite streaming job (ie, one that does not terminate because it is generating output date randomly on the fly). I kill this job with .stop() or .shutdown() method of ForkableFlinkMiniCluster. I did not find any example using a similar setup. In the provided examples, each job terminate automatically, because only a finite input is processed and the source returns after all data is emitted. I have multiple question about my setup: 1) The job never terminates clean, ie, I get some exceptions. Is this behavior desired? 2) Is it possible to get a result back? Similar to JobClient.submitJobAndWait(...)? 3) Is it somehow possible, to send a signal to the running job such that the source can terminate regularly as if finite input would be processed? Right now, I use an
Re: Question about Infinite Streaming Job on Mini Cluster and ITCase
Hi Robert, thanks for your answer. I get an InterruptedException when I call shutdown(): java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1225) at java.lang.Thread.join(Thread.java:1278) at org.apache.flink.streaming.io.StreamRecordWriter.close(StreamRecordWriter.java:55) at org.apache.flink.streaming.api.collector.StreamOutput.close(StreamOutput.java:77) at org.apache.flink.streaming.api.streamvertex.OutputHandler.flushOutputs(OutputHandler.java:204) at org.apache.flink.streaming.api.streamvertex.StreamVertex.invoke(StreamVertex.java:195) at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) at java.lang.Thread.run(Thread.java:701) About the JobExecutionResult: I added a new method to the API, that calls JobClient.submitJobDetached(...) instead of JobClient.submitJobAndWait(...). The detached version has no return value, while the blocking one returns a JobExecutionResult that is further returned by execute(). So I cannot get a JobExecutionResult right now. It would be nice to get the JobExecutionResult when stopping the running program via a stop-execution-call (is there any way to do this?). Right now, I sleep for a certain time after calling submitJobDetached(...) an call stop() and shutdown() later on (from ForkableMiniCluster). The stop() call does not seem to do anything... shutdown() works (except for the Exception I get -- as described above). -Matthias On 03/30/2015 09:08 PM, Robert Metzger wrote: Hi Matthias, the streaming folks can probably answer the questions better. But I'll write something to bring this message back to their attention ;) 1) Which exceptions are you seeing? Flink should be able to cleanly shut down. 2) As far as I saw it, the execute() method (of the Streaming API) got an JobExecutionResult return type in the latest master. That contains accumulator results. 3) I think the cancel() method is there for exactly that purpose. If the job is shutting down before the cancel method, that probably a bug. Robert On Fri, Mar 27, 2015 at 10:07 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi, I am trying to run an infinite streaming job (ie, one that does not terminate because it is generating output date randomly on the fly). I kill this job with .stop() or .shutdown() method of ForkableFlinkMiniCluster. I did not find any example using a similar setup. In the provided examples, each job terminate automatically, because only a finite input is processed and the source returns after all data is emitted. I have multiple question about my setup: 1) The job never terminates clean, ie, I get some exceptions. Is this behavior desired? 2) Is it possible to get a result back? Similar to JobClient.submitJobAndWait(...)? 3) Is it somehow possible, to send a signal to the running job such that the source can terminate regularly as if finite input would be processed? Right now, I use an while(running) loop and set 'running' to false in the .cancel() method. Thanks for your help! -Matthias signature.asc Description: OpenPGP digital signature
Question about Infinite Streaming Job on Mini Cluster and ITCase
Hi, I am trying to run an infinite streaming job (ie, one that does not terminate because it is generating output date randomly on the fly). I kill this job with .stop() or .shutdown() method of ForkableFlinkMiniCluster. I did not find any example using a similar setup. In the provided examples, each job terminate automatically, because only a finite input is processed and the source returns after all data is emitted. I have multiple question about my setup: 1) The job never terminates clean, ie, I get some exceptions. Is this behavior desired? 2) Is it possible to get a result back? Similar to JobClient.submitJobAndWait(...)? 3) Is it somehow possible, to send a signal to the running job such that the source can terminate regularly as if finite input would be processed? Right now, I use an while(running) loop and set 'running' to false in the .cancel() method. Thanks for your help! -Matthias signature.asc Description: OpenPGP digital signature