[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147525#comment-15147525
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-184273250
  
Thanks for your great work and patience @mjsax. I know this PR lingered 
around for far too long but I think we merged now a really good first version 
:-)

I also wanted to complete the 300 interactions on this PR thread ;-)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147487#comment-15147487
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-184254082
  
Thanks for merging! Very happy about it!


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147229#comment-15147229
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-184171212
  
Thanks for the review @StephanEwen. I will take care of your comments and 
then merge the PR :-)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147205#comment-15147205
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-184165224
  
This looks pretty good by now. I have a few remaining comments, otherwise 
this looks good to go:

  1. There is a duplicate and incorrect test dependency in `flink-tests` 
(critical, will break the build later)

  2. The are two types of Exceptions:
  - ProgramStopException
  - StoppingException 
 Both simply extend `Exception`. Can we consolidate these into one 
class?

  3. The JobManager tries to stop the job in state `CREATED`, `RUNNING`, 
and `RESTARTING`. Are we sure that the states `CREATED` and `RESTARTING` behave 
well?

  4. Minor comment: Adding the generic `SRC` parameter to the 
`SourceStreamTask` looks like a bit of overkill for only adding the 
`StoppableFunction` interface to the generic function type. A simple cast in 
`StoppableSourceStreamTask.stop()` would probably be the more lightweight 
change.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15147194#comment-15147194
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52883504
  
--- Diff: flink-tests/pom.xml ---
@@ -141,6 +149,14 @@ under the License.
 

org.apache.flink
+   flink-runtime-web
--- End diff --

Duplicate dependency. Also, this dependency misses the Scala version suffix.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144735#comment-15144735
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183382789
  
I will merge your PR and take care of the other issues you mentioned. 
Should I squash afterwards to get a potential final state of this PR that we 
might merge?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144754#comment-15144754
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183389629
  
Let us squash them, once we merge the PR. Otherwise changes are hard to 
track. 


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144757#comment-15144757
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183389977
  
About vendor.css etc (I guess I did not commit after the last run of gulp), 
so the committed version are dirty... It worked for me -- I guess it would be 
required that you test it again after I committed.



> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144809#comment-15144809
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183400616
  
Updated. I did not take your last two comments about `@SuppressWarnings` 
and `@SuppressWarnings("serial")` because this commit should not touch those 
files (not unrelated changed within a commit).


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15144765#comment-15144765
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183390556
  
Alright, will do.

On Fri, Feb 12, 2016 at 5:10 PM, Matthias J. Sax 
wrote:

> About vendor.css etc (I guess I did not commit after the last run of
> gulp), so the committed version are dirty... It worked for me -- I guess 
it
> would be required that you test it again after I committed.
>
> —
> Reply to this email directly or view it on GitHub
> .
>



> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142877#comment-15142877
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182915371
  
It seems that reverting `vendor.css` and `vendor.js` to the latest master 
version does the trick :-)

I've tested the stop functionality locally and on yarn using both the cli 
and the web interface. Works like a charm.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142675#comment-15142675
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52597604
  
--- Diff: docs/apis/cli.md ---
@@ -248,6 +252,16 @@ Action "cancel" cancels a running program.
configuration.
 
 
+Action "stop" stops a running program (streaming jobs only).
--- End diff --

Maybe we should add that it a stop signal without strong consistency 
guarantees.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142693#comment-15142693
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598719
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+"is not in state CREATED, RUNNING, or RESTARTING."))
+} else {
+  executionGraph.stop()
+  sender ! StoppingSuccess(jobID)
+}
+  } catch {
+case t: Throwable =>  sender ! StoppingFailure(jobID, t)
--- End diff --

`decorateMessage`


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142692#comment-15142692
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598713
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+"is not in state CREATED, RUNNING, or RESTARTING."))
+} else {
+  executionGraph.stop()
+  sender ! StoppingSuccess(jobID)
--- End diff --

`decorateMessage`


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142668#comment-15142668
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52597099
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.api.common.functions;
+
+/**
+ * Must be implemented by stoppable functions, eg, source functions of 
streaming jobs. The method {@link #stop()} will
+ * be called when the job received the STOP signal. On this signal, the 
source function must stop emitting new data and
+ * terminate gracefully.
+ */
+public interface StoppableFunction {
+   /**
+* Stops the source. In contrast to {@code cancel()} this is a request 
to the source function to shut down
+* gracefully. Pending data can still be emitted and it is not required 
to stop immediately -- however, in the near
+* future. The job will keep running until all emitted data is 
processed completely.
+* 
+* Most streaming sources will have a while loop inside the {@code 
run()} method. You need to ensure that the source
+* will break out of this loop. This can be achieved by having a 
volatile field "isRunning" that is checked in the
+* loop and that is set to false in this method.
+* 
+* The call to {@code stop()} should not block and not throw 
any exception.
+*/
+   public void stop();
--- End diff --

interface methods are always public


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142690#comment-15142690
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598636
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
--- End diff --

`decorateMessage` is missing


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142694#comment-15142694
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598725
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+"is not in state CREATED, RUNNING, or RESTARTING."))
+} else {
+  executionGraph.stop()
+  sender ! StoppingSuccess(jobID)
+}
+  } catch {
+case t: Throwable =>  sender ! StoppingFailure(jobID, t)
+  }
+case None =>
+  log.info(s"No job found with ID $jobID.")
+  sender ! StoppingFailure(jobID, new IllegalArgumentException("No 
job found with " +
--- End diff --

`decorateMessage`


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142681#comment-15142681
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598106
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobgraph.tasks;
+
+/**
+ * Implemented by tasks that can receive STOP signal.
+ */
+public interface StoppableTask {
+   /** Called on STOP signal. */
+   public void stop();
--- End diff --

public is not required here


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142688#comment-15142688
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598511
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -417,6 +417,28 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
$executionID)")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
+  sender ! decorateMessage(new 
TaskOperationResult(executionID, true))
+} catch {
+  case t: Throwable =>
+sender ! new TaskOperationResult(executionID, 
false,
--- End diff --

decorateMessage is missing here


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142691#comment-15142691
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598649
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
--- End diff --

`decorateMessage` is missing


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142684#comment-15142684
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598306
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
--- End diff --

Maybe we could say what the current state of the job is and that it cannot 
be stopped because of that.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142704#comment-15142704
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182859453
  
Really good work @mjsax.

I had a couple of minor inline comments with respect to the latest commit 
and some with respect to the overall changes (especially concerning the 
`decorateMessage` method).

Why have you changed the `vendor.js` and the `index.js` file? I thought 
that these files should stay unchanged.

I was wondering whether we shouldn't make the `StoppableSourceStreamTask` 
and the `StoppableStreamSource` type safe, e.g. enforcing that the 
`StreamSource` and the `SourceFunction` are actually stoppable. Currently, this 
is not properly checked/enforced. This should be done by the compiler.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142727#comment-15142727
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52601121
  
--- Diff: flink-runtime-web/web-dashboard/web/css/vendor.css ---
@@ -7704,6 +7704,7 @@ button.close {
 .modal-header {
   padding: 15px;
   border-bottom: 1px solid #e5e5e5;
+  min-height: 16.42857143px;
--- End diff --

If I'm not mistaken, then the `vendor.css` file is generated and should not 
be changed. Css style changes should go to `app/styles/`


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142792#comment-15142792
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182884939
  
The WebUI is not shown when I build the PR and start a local cluster. Does 
it work for you?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142890#comment-15142890
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182917968
  
I've opened a PR against your branch which makes the 
`StoppableSourceStreamTask` and the `StoppableStreamSource` type safe. 
Furthermore, it fixes the problem with the web dashboard files.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15140842#comment-15140842
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182393013
  
@tillrohrmann just updated this. Let's see if you think it is ready now... 
;)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136892#comment-15136892
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181350773
  
I strongly agree with @StephanEwen not to make the distinction between 
streaming and batch jobs in the runtime explicit. Even though this might only 
be a cosmetic change for this PR but who knows what other people will build on 
top of it once it is there. I fear that we won't be able to remove such a flag 
once introduced.

Concerning whether all `SourceFunction` should have a `stop` method: I 
think it is not a priori clear that all streaming sources are gracefully 
stoppable. Take for example the `FlinkKafkaConsumer08`. Currently,   the stop 
method is implemented by calling `cancel`. However, `cancel` does not guarantee 
that the sources stop gracefully. Exceptions might be thrown and the state 
might be inconsistent. In fact, with the current implementation, `stop` won't 
stop the `FlinkKafkaConsumer08` at all. The `cancel` call only works in 
combination with the `TaskCanceler` which interrupts the `Task` thread (in this 
case `LegacyFetcher.run`). If the interrupt does not happen, the 
`SimpleConsumerThreads` might be stuck for all eternity in the fetch loop.

Furthermore, adding the `stop` method to the `SourceFunction` interface 
will be an API breaking change and all users would have to implement `stop`, 
henceforth. I think a `StoppableOperator` interface would be more modular and 
less restrictive because you have the freedom not to implement a graceful stop 
operation.

What do you think?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137016#comment-15137016
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181402377
  
Ok. Your arguments are very strong. I will update this PR accordingly.

About `FlinkKafkaConsumer08` (and actually all other available sources). 
IMHO, we need a new JIRA to update all sources with an proper `stop()` 
implementation (for which it is suitable). This will include to get a last 
consistent snapshot etc. We should also enable stopping in combination with the 
recently introduces savepoints. But of course, I don't want to extend this PR 
further and add those features as later improvements on top of the STOP feature.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15137057#comment-15137057
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181419327
  
Yes exactly. Let us tackle the next steps (adding stopping functionality to 
more sources, consistent behaviour, drawing savepoint, etc.) once we have a 
first version of the stop signal merged.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136295#comment-15136295
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181040537
  
I don't see much of a difference here.
(1) Of course, we can have a stoppable flag instead of the `JobType`; but I 
don't see any advantage. Why should it be bad to add a job type -- it is 
basically meta information in both cases anyway? And for batch jobs, being 
stoppable does not make much sense anyway.
(2) From my point of view, all `StreamSource`s should be stoppable. I 
cannot think of a source, that is not stoppable -- I am also wondering why 
Kafka sources should not be stoppable properly (on stop, they just do not 
consume any more data from Kafka).
(3) If we keep it as is, there is not reason to infer this. If we apply 
your changes, I agree that it would make sense to detect and set the stoppable 
flag automatically.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136357#comment-15136357
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181072411
  
We have managed to far to not make a distinction between batch and 
streaming jobs, which was very good. Given that the future plan is that 
streaming jobs subsume everything, introducing such a change now starts this 
off in a wrong direction, in my opinion.

Defining a flag for what it actually should mean (i.e. is the job 
stoppable) ties the change to the feature, rather than introducing a 
distinction we work hard not to make.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135236#comment-15135236
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-180617867
  
Ok let us include as a first version a stop implementation where you don't 
have the strong consistency guarantees. But before I think we should still 
change three things.

The first thing I would like to change is the `JobType`. I feel a bit 
uneasy adding explicit information whether a job is a streaming or a batch job 
to the `JobGraph`. So far we've succeeded in keeping the runtime agnostic to 
the type of job. I would propose to change it to a `stoppable` flag. Thus, 
indicating whether the job is stoppable or not. Semantically, it should not 
make a big difference.

The second thing I would propose is to remove `stop` from the 
`StreamSource` class in order to make it not mandatory for all streaming 
sources to be stoppable. Instead I would also introduce a `StoppableOperator` 
interface which can be implemented by streaming sources which are in fact 
stoppable. I'm not so sure, for example, whether the Kafka sources are properly 
stoppable without throwing exceptions. By introducing this interface we can 
selectively implement stop methods for sources. A nice side effect is that 
existing code won't break if you have a custom source implementation.

And as a last point I would propose to automatically infer whether a job is 
stoppable or not. What we have to do is to check that all source functions 
implement the `StoppableOperator` interface. This should be doable in the 
`StreamGraphGenerator`. Based on this information we can then set the stoppable 
flag in the `JobGraph` or not.

What do you think? Would that be feasible?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15130116#comment-15130116
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-179126064
  
Build failed due to compilation error. Seems to be a caching problem. 
Builds without problems locally and on personal Travis: 
https://travis-ci.org/apache/flink/builds/106583181
(test errors on personal Travis is know issue: 
https://issues.apache.org/jira/browse/FLINK-3313)

@tillrohrmann Please review.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15130446#comment-15130446
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-179261307
  
I think we can get this done today or tomorrow and then finally merge the 
PR. Looking forward having this code in the master branch :-)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15130443#comment-15130443
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-179260525
  
Good work @mjsax. I had one last comment. It concerns stopping a job when 
it's in the state `RESTARTING`. I think we have to allow the state transition 
`RESTARTING` to `FINISHED` when the user calls stop on the job (see me code 
comment). 

What do you think?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106486#comment-15106486
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-172791340
  
Are we gonna get this in for the 1.0 release?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106551#comment-15106551
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-172805169
  
Great to hear :-) I guess in the next couple weeks but it of course depends 
on the community. 


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106495#comment-15106495
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-172792701
  
That is my plan. Is there already a planed date to release 1.0 ?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093904#comment-15093904
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49455315
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+
+public final class StoppableInvokable extends AbstractInvokable implements 
Stoppable {
+   private boolean isRunning = true;
+
+   @Override
+   public void registerInputOutput() {}
+
+   @Override
+   public void invoke() throws Exception {
+   while(isRunning) {
+   Thread.sleep(100);
+   }
+   }
+
+   @Override
+   public void stop() {
+   this.isRunning = false;
+   }
+}
--- End diff --

No newline at end of file


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093897#comment-15093897
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49455037
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+   private AbstractInvokable taskMock;
+   private Task task;
+
+   public void doMocking() throws Exception {
+   TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+   when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+   when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+   when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+   
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+   when(tddMock.getTaskName()).thenReturn("taskName");
+   
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+   
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+   when(tddMock.getInvokableClassName()).thenReturn("className");
+
+   task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+   mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+   mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+   mock(TaskManagerRuntimeInfo.class));
+   Field f = task.getClass().getDeclaredField("invokable");
+   f.setAccessible(true);
+   f.set(task, taskMock);
--- End diff --

The same still applies to `task`. Why do you use a class field for a return 
value?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: 

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093880#comment-15093880
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49454567
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
--- End diff --

`extends TestLogger` missing


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093753#comment-15093753
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49441547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,52 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
--- End diff --

I think this belongs in the JavaDocs of this function.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093854#comment-15093854
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49451808
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StoppingException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+ 

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093752#comment-15093752
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49441422
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+public class StoppingException extends Exception {
--- End diff --

JavaDocs missing


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093757#comment-15093757
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49442110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/Stoppable.java
 ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.runtime.jobgraph.tasks;
+
+/**
+ * Must be implemented by stoppable tasks.
--- End diff --

What are stoppable tasks? Maybe a bit more context would be helpful. E.g. 
it should only be implemented by streaming sources, the method `stop` is called 
when the JM receives a stop message, etc.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093990#comment-15093990
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49462409
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -742,10 +743,38 @@ private void notifyFatalError(String message, 
Throwable cause) {
}
 
// 

-   //  Canceling / Failing the task from the outside
+   //  Stopping / Canceling / Failing the task from the outside
// 

 
/**
+* Stops the executing task by calling {@link Stoppable#stop()}.
+* 
+* This method never blocks.
+* 
+* 
+* @throws UnsupportedOperationException
+* if the {@link AbstractInvokable} does not implement 
{@link Stoppable}
+*/
+   public void stopExecution() throws UnsupportedOperationException {
+   LOG.info("Attempting to stop task " + taskNameWithSubtask);
+   if(this.invokable instanceof Stoppable) {
+   Runnable runnable = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   
((Stoppable)Task.this.invokable).stop();
+   } catch(RuntimeException e) {
+   LOG.error("Stopping task " + 
taskNameWithSubtask + " failed.", e);
--- End diff --

Shouldn't a `RuntimeException` cause the task to fail? We should notify the 
`JobManager` about this and fail the corresponding task. E.g. we could send a 
`FailTask` message to the `TaskManager` to fail the task.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15094028#comment-15094028
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-170942707
  
Good work @mjsax. I think the testing of the REST interface is sufficient, 
also without having a dedicated YARN test case.  

I had some more comments concerning the failure case handling of stop 
calls. The first problem is still the handling of exceptions when calling 
`stop` on the `Invokable` in `Task.stopExecution`. The exception will only be 
logged but no further action is taken. This can lead to a situation where we 
have a corrupted state. I think, we should fail the task in such a situation.

Additionally, the case that a task cannot be found on the `TaskManager` and 
that an exception occurs in `Task.stopExecution` are treated identically by 
sending a `TaskOperationResult` with `success == false` to the `JobManager`. On 
the `JobManager` side this will only be logged. I think the exception case 
should be handled differently. Failing the execution, for example.

And it is still possible that you send a `StopJob` message to the 
`JobManager`, see that the job is in state `RUNNING`, then the `ExecutionGraph` 
switches to `RESTARTING`, and then the stop call is executed on the 
`ExecutionGraph` which won't have an effect. As a user you will receive a 
`StoppingSuccess` message but the job will simply be restarted. I think we 
should also allow stopping jobs when they are in the state `RESTARTING`.

What do you think?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15094011#comment-15094011
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49463485
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,52 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
+
+   final SimpleSlot slot = this.assignedResource;
+
+   if (slot != null) {
+   final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+
+   Future stopResult = gateway.retry(
+   new StopTask(attemptId),
+   NUM_STOP_CALL_TRIES,
+   timeout,
+   executionContext);
+
+   stopResult.onComplete(new OnComplete() {
+   @Override
+   public void onComplete(Throwable failure, 
Object success) throws Throwable {
+   if (failure != null) {
+   fail(new Exception("Task could 
not be stopped.", failure));
+   } else {
+   TaskOperationResult result = 
(TaskOperationResult) success;
+   if (!result.success()) {
--- End diff --

Does this condition really say that the task could not be found? In the 
current implementation a `TaskOperationResult` message with `success == false` 
is also returned if an exception is thrown in `Task.stop`.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15093979#comment-15093979
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49461844
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -417,6 +417,28 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
$executionID)")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
+  sender ! decorateMessage(new 
TaskOperationResult(executionID, true))
+} catch {
+  case t: Throwable =>
+sender ! new TaskOperationResult(executionID, 
false,
--- End diff --

Sending for every `Throwable` a `TaskOperationResult(.., false, ...)` back 
will result in a logging statement on the `JobManager` side that the 
corresponding task could not be found. This seems to be wrong since the task 
was found but an exception occurred while calling stop on it. The `stop` method 
of `Task` can throw an `UnsupportedOperationException` which should lead on the 
`JobManager` side to a logging statement `Task does not support stop`. Every 
other exception should lead to a cancellation of the corresponding `Task`, 
because something obviously went wrong.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15094007#comment-15094007
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49463254
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -427,6 +427,28 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (executionGraph.getState() != JobStatus.RUNNING) {
--- End diff --

This guard is not sufficient since job state changes can happen 
asynchronously. Thus, it might happen that the current state is 
`JobStatus.RUNNING` when checking this condition, but before you execute 
`executionGraph.stop()` it will change to `JobStatus.RESTARTING`, for example. 
As a consequence, the sender will receive a `StoppingSuccess` message even 
though the job won't be stopped.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15094012#comment-15094012
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49463705
  
--- Diff: flink-runtime-web/web-dashboard/web/partials/jobs/job.html ---
@@ -33,6 +33,7 @@
   {{ job['end-time'] | amDateFormat:'-MM-DD, H:mm:ss' 
}}
   {{job.duration | 
humanizeDuration:true}}
   Cancel
+  Stop
--- End diff --

Why can't I stop a Flink job when it is in state `RESTARTING`?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15094057#comment-15094057
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-170947333
  
Once we've resolved these problems, I'll test the code on the cluster and 
YARN. If they pass, then we should merge this PR.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085210#comment-15085210
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-169267260
  
Just updated this. I did not add an additional YARN test, because I test 
`GET` and `DELETE` in the REST test. About your last comment regarding 
`Execution`: behavior of STOP is the same as for CANCEL. I think we should keep 
it this way for consistency.

Btw: I never tested in a YARN setup...


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085608#comment-15085608
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-169344534
  
Travis failed with weird compilation error. I guess this is an caching 
issue. It builds locally and on my personal Travis: 
https://travis-ci.org/mjsax/flink/builds/100576933


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15081136#comment-15081136
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-168673112
  
Just curious what the current state of this PR is.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15081458#comment-15081458
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-168749820
  
Great to hear :-)

On Mon, Jan 4, 2016 at 6:33 PM, Matthias J. Sax 
wrote:

> Addressing your comments is WIP... Hope to get it done this week.
>
> —
> Reply to this email directly or view it on GitHub
> .
>



> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2016-01-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15081422#comment-15081422
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-168746132
  
Addressing your comments is WIP... Hope to get it done this week.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041420#comment-15041420
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46670585
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

But I think that it is a good idea to introduce a `StoppingException` 
because it's definitely not a `RuntimeException`.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041381#comment-15041381
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46667227
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

Sorry @mjsax, my bad. I overlooked the try catch block in the `JobManager`. 
Should work perfectly fine :-)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041418#comment-15041418
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46670412
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

The problem is that `StoppableInvokable` is not instantiatable. Thus the 
stop signal does not stop anything because the tasks go directly into the state 
Failed. Furthermore, I noticed that we don't need the `NotifyWhenJobRemoved` 
message because we can wait for the `JobResultSuccess` message. I've fixed the 
test and opened a PR against your branch.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041444#comment-15041444
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161943293
  
When I use the stop button of the web interface, then my jobs get 
cancelled. Using the cli, the stop terminates the job gracefully. Thus, there 
seems to be still an issue with the web interface.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041428#comment-15041428
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161939523
  
Have you added the tests for the REST interface? I couldn't find them. For 
the future it might be helpful, if you simply push commits to your PR branch 
without squashing them. It is impossible for me to see what you've actually 
changed with your last commit.

Concerning the REST test case, I assume that you have to start a 
`JobManager` with the `WebRuntimeMonitor` running. Then you submit a job and 
send a stop request to the web interface. I would assume that you can use any 
HTTP client for that. Afterwards you can check the JSON result and verify that 
the job has actually been stopped. That you can also directly do by querying 
the `JobManager`.

It would be important to also add such a test for the yarn use case. There 
you can take a look at the existing yarn test cases to see how to start yarn 
cluster. It's important because Yarn only allows GET requests, if I'm not 
mistaken.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041478#comment-15041478
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161949525
  
When fixing the `JobManagerTest` I noticed the following. When the job was 
stopped when it was still in the state `SCHEDULED` or `DEPLOYING`, then one 
received a `StoppingSuccess`. The problem was that the stop was not executed 
and the job later switched to `RUNNING`.

The same can be observed if the job is in state `RESTARTING`. Stopping a 
restarting job does nothing even though you receive a `StoppingSuccess` 
message. The job will later be redeployed. 

As a user I would expect that the job is immediately stopped or at least at 
the next possible moment (e.g. when it's deployed). Or I would expect that the 
system tells me that the stopping is at the moment not possible.

Similar is the question, what happens if only a subset of all sources is 
deployed and in the state `RUNNING`. This would mean that the undeployed 
sources won't get noticed about the stopping signal and, thus, be normally 
deployed. 

Furthermore, what happens if the `stop` method of the `SourceFunction` 
throws an unchecked exception? If I'm not mistaken, then this will only get 
logged. But shouldn't the task be cancelled in such a situation because the 
state cannot be guaranteed to be consistent anymore?

The case that a `Task` is not `Stoppable` and that a `Task` cannot be found 
on the `TaskManager` are treated by the `Execution` identically. Both cases 
cause a `TaskOperationResult(executionID, false, message)` to be sent back to 
the `Execution`. There it will be logged that the stopping call "did not find 
the task". I think it would be good to differentiate the two cases.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15041611#comment-15041611
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161982097
  
Thanks. I will take care of it.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036471#comment-15036471
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46465735
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

I will just revert the file.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15036827#comment-15036827
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161460588
  
Updated this PR. The test that should be extended is the last open question 
if I did not miss anything else.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035625#comment-15035625
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46399920
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class ExecutionJobVertexTest {
+
+   @Test
+   public void testReceivedStopSignal() throws JobException {
--- End diff --

I agree, the more testing the better.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035621#comment-15035621
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46399687
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

If this is not strictly needed because it's auto-generated, then this file 
should be excluded imo.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035800#comment-15035800
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46414370
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

@tillrohrmann any input here? I am still stuck to extend this test? Or 
should I just leave it as is?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035796#comment-15035796
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46414197
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

@sachingoel0101 can you confirm that this file can be safely removed and 
"git-ignored" ?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035821#comment-15035821
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46415616
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

It is not an auto-generated file, it is basically the set build utils used 
for the dashboard. Think "Maven build plugins". It is an important part of the 
setup and should not be git-ignored.

Something is strange here, this should not be auto changed when running the 
build tools. The changes to this file should not be committed, they have 
nothing to do with the re-building of the dashboard files.

I am wondering if someone ran a util command that tries to bump all 
dependencies to the latest version.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035845#comment-15035845
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46418046
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

Will take a look asap. Have to finish first another task.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034069#comment-15034069
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46305433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

I think it is a bit hard to throw a `RuntimeException` in the case of a 
batch job. This effectively means that whenever you call stop on a batch job, 
the `JobManager` will die with a `RuntimeException`. I think it is better to 
simply tell the user that stop is not supported for batch jobs or to call the 
cancel method for batch jobs.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034090#comment-15034090
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46307098
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

I tried to make this work, but without success... I added the following two 
lines:
```
jobManagerGateway.tell(new NotifyWhenJobRemoved(jid), testActorGateway);
expectMsgEquals(true);
```
But the returned message is `JobResultFailure` instead of `true` (I was 
looking at `JobManagerITCase.scala` in which `NotifyWhenJobRemoved` is also 
used and which expects `true` as return value).

As I am not familiar with Akka I need your input here.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034120#comment-15034120
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46308707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

This is not a problem: in `JobManager.scala` a potential `Throwable` is 
caught when `ExecutionGraph.stop()` is called and a prober `StoppingFailure` 
message is sent back as response. I tested this in local mode. The job manager 
did not go down. Or is there any other place in the code in which this could 
become a problem? As alternative, we could introduce a regular exception (ie, 
`StoppingException`) and add a `throws` clause to `stop()`. What do you think 
about this?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034173#comment-15034173
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161044122
  
Do I have to build the web resources as well? Because I don't see the stop 
button.

Thanks for your contribution @mjsax. The PR is in a good shape.

I think it would be good to add tests which test the REST stopping 
interface for a standalone cluster and a yarn cluster. Furthermore, the 
JobManager shouldn't fail with a `RuntimeException` when the cli frontend tries 
to stop a batch job.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15034189#comment-15034189
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161045939
  
Actually all changes after running `gulp` in 
`flink-runtime-web/web-dashboard` are committed. There should be no need. I had 
similar problems as the WebUI caches stuff in `/tmp` -- try to clean it there 
(clearing browser cache might also help)

About `RuntimeException` and CLI -- that should work already as you wish. 
But please try it out again to verify. About the REST and yarn tests: I never 
wrote a test like this. Can you give some pointers how to do this?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033449#comment-15033449
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46259453
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

It would be nice if @mjsax could remove the sentence: 
"Specify 'yarn-cluster' as the JobManager to deploy a YARN cluster for the 
job. "
from the description. The "stop" command is not about deploying a job ;)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033847#comment-15033847
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290038
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -112,19 +114,24 @@
 
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
-   
+
+   private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
--- End diff --

IntelliJ says that this variable is never used.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033852#comment-15033852
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -227,6 +229,10 @@ public ExecutionState getAggregateState() {

return getAggregateJobVertexState(num, parallelism);
}
+
+   public boolean receivedStopSignal() {
--- End diff --

For what do we need this method and the `receivedStopSignal` field?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033884#comment-15033884
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46292645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -112,19 +114,24 @@
 
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
-   
+
+   private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
--- End diff --

Seems to be a rebasing error (incorrect resolve merge conflict). Will 
remove it.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033853#comment-15033853
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290465
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

This might end up in a race condition. STOP only sets a flag to the sources 
that they should return from `run()` -- however, the job itself is still in 
state `RUNNING` before this happens, thus it will not be removed from 
`currentJobs` until the sources (actually the whole job) finished. If we check 
too early this test fails. I would rather leave it as is -- otherwise, we 
introduce an instability to this test. Let me know if you disagree.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033864#comment-15033864
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46291411
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+   private AbstractInvokable taskMock;
+   private Task task;
+
+   public void doMocking() throws Exception {
+   TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+   when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+   when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+   when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+   
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+   when(tddMock.getTaskName()).thenReturn("taskName");
+   
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+   
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+   when(tddMock.getInvokableClassName()).thenReturn("className");
+
+   task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+   mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+   mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+   mock(TaskManagerRuntimeInfo.class));
+   Field f = task.getClass().getDeclaredField("invokable");
+   f.setAccessible(true);
+   f.set(task, taskMock);
--- End diff --

private member `Task.invokable` will be `null` until it is created and set 
in `Task.run()` which is never called in this tests.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: 

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033856#comment-15033856
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290688
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -420,6 +420,23 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job withID $jobID.")
--- End diff --

whitespace


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033889#comment-15033889
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46293301
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -227,6 +229,10 @@ public ExecutionState getAggregateState() {

return getAggregateJobVertexState(num, parallelism);
}
+
+   public boolean receivedStopSignal() {
--- End diff --

The old WebUI used it (this code got delete when we switched to the new UI) 
-- I guess it is obsolete now. Will remove it.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033917#comment-15033917
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46294804
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+ 

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033923#comment-15033923
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46295358
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class ExecutionJobVertexTest {
+
+   @Test
+   public void testReceivedStopSignal() throws JobException {
--- End diff --

This is obsolete now. I just remove the whole test, because I reverted my 
changes to `ExecutionJobVertex`. Initially, I just added a new test for the 
stuff I added to `ExecutionJobVertex`. Of course it would be nice to test more 
-- I just felt this would be out of scope of this PR.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033785#comment-15033785
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46285844
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+  

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033787#comment-15033787
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46286009
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

The file was added by @StephanEwen -- it got updated when I ran `gulp` so I 
just committed the changes. To me, it would make sense to remove the file -- 
not sure why it is there... (maybe we should .gitignore it after removal)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033797#comment-15033797
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46286476
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class ExecutionJobVertexTest {
+
+   @Test
+   public void testReceivedStopSignal() throws JobException {
--- End diff --

I haven't said to drop it since it is already there. However, I'm 
questioning whether it makes sense to write tests for setters/getters which 
don't involve complex logic. Effectively, the test tests 

```
public void setStopSignalReceived() {
this.receivedStopSignal = true;
}
```

which is trivial. If we start testing these kind of methods, then I'm 
wondering why we don't test other getters as well. From a test coverage point 
of view, it seems to be a bit arbitrary to only test this setter then.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033812#comment-15033812
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46287242
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+  

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033772#comment-15033772
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46285044
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import java.util.UUID;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+import static 
org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest {
+
+   private static ActorSystem actorSystem;
+
+   @BeforeClass
+   public static void setup() {
+   pipeSystemOutToNull();
+   actorSystem = ActorSystem.create("TestingActorSystem");
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(actorSystem);
+   actorSystem = null;
+   }
+
+   @BeforeClass
+   public static void init() {
--- End diff --

For me it doesn't make sense to have multiple `@BeforeClass` methods. IMO, 
you should change it here and address `CliFrontendCancelTest` as part of 
another PR.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033779#comment-15033779
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46285468
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,53 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
+
+   final SimpleSlot slot = this.assignedResource;
+
+   if (slot != null) {
+   final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+
+   Future stopResult = gateway.retry(
+   new StopTask(attemptId),
+   NUM_STOP_CALL_TRIES,
+   timeout,
+   executionContext);
+
+   stopResult.onComplete(new OnComplete() {
+   @Override
+   public void onComplete(Throwable failure, 
Object success) throws Throwable {
+   if (failure != null) {
+   fail(new Exception("Task could 
not be stopped.", failure));
+   } else {
+   TaskOperationResult result = 
(TaskOperationResult) success;
+   if (!result.success()) {
+   LOG.debug("Stopping 
task call did not find task. Probably akka message call race.");
--- End diff --

Again sticking to antiquated traditions is imho not a good option if one 
can improve it. Including the change of `cancel's` `onComplete` would be ok for 
me here. But in general, orthogonal changes should be included in a separate PR.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033799#comment-15033799
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46286695
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

:+1:


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033810#comment-15033810
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46287141
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
}
 
/**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
--- End diff --

Will change it only here. (Just not a big fan of it, as the other code will 
potentially stay "old" forever. Nobody just rewrites it "just for fun" -- if 
not addressed now as we are aware of the old pattern, we will forget about 
updated the other parts)


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033807#comment-15033807
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46287079
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
--- End diff --

I guess you could test that also in a bit broader context. But that's more 
for the future, since you've already written the test.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033685#comment-15033685
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46277994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+  

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033704#comment-15033704
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46279382
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
--- End diff --

I'm not so sure whether this test adds much value to be honest. It 
basically tests that `Execution.stop()` is called when `ExecutionVertex.stop()` 
is called with:
```
public void stop() {
this.currentExecution.stop();
}
```

is called.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033743#comment-15033743
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46283023
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
+   final JobVertexID jid = new JobVertexID();
+   final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+   Execution executionMock = mock(Execution.class);
+   
whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
+
+   final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new 
IntermediateResult[0],
+   AkkaUtils.getDefaultTimeout());
+
+   vertex.stop();
+
+   verify(executionMock).stop();
+   }
+
+   private static ActorSystem system;
+
+   @AfterClass
+   public static void teardown(){
+   if(system != null) {
+   JavaTestKit.shutdownActorSystem(system);
+   system = null;
+   }
+   }
+
+   static boolean receivedStopSignal;
--- End diff --

Ok. Will do. (for a test I thought is would be better to have it at the 
singlet test method where it is used).


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean 

[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15033686#comment-15033686
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46278142
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class ExecutionJobVertexTest {
+
+   @Test
+   public void testReceivedStopSignal() throws JobException {
--- End diff --

I'm not sure, whether we really need this test. It basically test the 
setter, right?


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >