[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-15 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-184254082
  
Thanks for merging! Very happy about it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/750


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-15 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-184171212
  
Thanks for the review @StephanEwen. I will take care of your comments and 
then merge the PR :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-184165224
  
This looks pretty good by now. I have a few remaining comments, otherwise 
this looks good to go:

  1. There is a duplicate and incorrect test dependency in `flink-tests` 
(critical, will break the build later)

  2. The are two types of Exceptions:
  - ProgramStopException
  - StoppingException 
 Both simply extend `Exception`. Can we consolidate these into one 
class?

  3. The JobManager tries to stop the job in state `CREATED`, `RUNNING`, 
and `RESTARTING`. Are we sure that the states `CREATED` and `RESTARTING` behave 
well?

  4. Minor comment: Adding the generic `SRC` parameter to the 
`SourceStreamTask` looks like a bit of overkill for only adding the 
`StoppableFunction` interface to the generic function type. A simple cast in 
`StoppableSourceStreamTask.stop()` would probably be the more lightweight 
change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52883504
  
--- Diff: flink-tests/pom.xml ---
@@ -141,6 +149,14 @@ under the License.
 

org.apache.flink
+   flink-runtime-web
--- End diff --

Duplicate dependency. Also, this dependency misses the Scala version suffix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-12 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183389629
  
Let us squash them, once we merge the PR. Otherwise changes are hard to 
track. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-12 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183390556
  
Alright, will do.

On Fri, Feb 12, 2016 at 5:10 PM, Matthias J. Sax 
wrote:

> About vendor.css etc (I guess I did not commit after the last run of
> gulp), so the committed version are dirty... It worked for me -- I guess 
it
> would be required that you test it again after I committed.
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-12 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183382789
  
I will merge your PR and take care of the other issues you mentioned. 
Should I squash afterwards to get a potential final state of this PR that we 
might merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-12 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183389977
  
About vendor.css etc (I guess I did not commit after the last run of gulp), 
so the committed version are dirty... It worked for me -- I guess it would be 
required that you test it again after I committed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-12 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-183400616
  
Updated. I did not take your last two comments about `@SuppressWarnings` 
and `@SuppressWarnings("serial")` because this commit should not touch those 
files (not unrelated changed within a commit).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52597099
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/StoppableFunction.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.api.common.functions;
+
+/**
+ * Must be implemented by stoppable functions, eg, source functions of 
streaming jobs. The method {@link #stop()} will
+ * be called when the job received the STOP signal. On this signal, the 
source function must stop emitting new data and
+ * terminate gracefully.
+ */
+public interface StoppableFunction {
+   /**
+* Stops the source. In contrast to {@code cancel()} this is a request 
to the source function to shut down
+* gracefully. Pending data can still be emitted and it is not required 
to stop immediately -- however, in the near
+* future. The job will keep running until all emitted data is 
processed completely.
+* 
+* Most streaming sources will have a while loop inside the {@code 
run()} method. You need to ensure that the source
+* will break out of this loop. This can be achieved by having a 
volatile field "isRunning" that is checked in the
+* loop and that is set to false in this method.
+* 
+* The call to {@code stop()} should not block and not throw 
any exception.
+*/
+   public void stop();
--- End diff --

interface methods are always public


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182915371
  
It seems that reverting `vendor.css` and `vendor.js` to the latest master 
version does the trick :-)

I've tested the stop functionality locally and on yarn using both the cli 
and the web interface. Works like a charm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598306
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
--- End diff --

Maybe we could say what the current state of the job is and that it cannot 
be stopped because of that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598713
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+"is not in state CREATED, RUNNING, or RESTARTING."))
+} else {
+  executionGraph.stop()
+  sender ! StoppingSuccess(jobID)
--- End diff --

`decorateMessage`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598725
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+"is not in state CREATED, RUNNING, or RESTARTING."))
+} else {
+  executionGraph.stop()
+  sender ! StoppingSuccess(jobID)
+}
+  } catch {
+case t: Throwable =>  sender ! StoppingFailure(jobID, t)
+  }
+case None =>
+  log.info(s"No job found with ID $jobID.")
+  sender ! StoppingFailure(jobID, new IllegalArgumentException("No 
job found with " +
--- End diff --

`decorateMessage`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598719
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+"is not in state CREATED, RUNNING, or RESTARTING."))
+} else {
+  executionGraph.stop()
+  sender ! StoppingSuccess(jobID)
+}
+  } catch {
+case t: Throwable =>  sender ! StoppingFailure(jobID, t)
--- End diff --

`decorateMessage`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598649
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
+" is not stoppable."))
+} else if(executionGraph.getState() != JobStatus.CREATED
+&& executionGraph.getState() != JobStatus.RUNNING
+&& executionGraph.getState() != JobStatus.RESTARTING) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
--- End diff --

`decorateMessage` is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598106
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StoppableTask.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobgraph.tasks;
+
+/**
+ * Implemented by tasks that can receive STOP signal.
+ */
+public interface StoppableTask {
+   /** Called on STOP signal. */
+   public void stop();
--- End diff --

public is not required here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598636
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -464,6 +464,33 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (!executionGraph.isStoppable()) {
+  sender ! StoppingFailure(jobID, new 
IllegalStateException(s"Job with ID $jobID" +
--- End diff --

`decorateMessage` is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52598511
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -417,6 +417,28 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
$executionID)")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
+  sender ! decorateMessage(new 
TaskOperationResult(executionID, true))
+} catch {
+  case t: Throwable =>
+sender ! new TaskOperationResult(executionID, 
false,
--- End diff --

decorateMessage is missing here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52597604
  
--- Diff: docs/apis/cli.md ---
@@ -248,6 +252,16 @@ Action "cancel" cancels a running program.
configuration.
 
 
+Action "stop" stops a running program (streaming jobs only).
--- End diff --

Maybe we should add that it a stop signal without strong consistency 
guarantees.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r52601121
  
--- Diff: flink-runtime-web/web-dashboard/web/css/vendor.css ---
@@ -7704,6 +7704,7 @@ button.close {
 .modal-header {
   padding: 15px;
   border-bottom: 1px solid #e5e5e5;
+  min-height: 16.42857143px;
--- End diff --

If I'm not mistaken, then the `vendor.css` file is generated and should not 
be changed. Css style changes should go to `app/styles/`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182859453
  
Really good work @mjsax.

I had a couple of minor inline comments with respect to the latest commit 
and some with respect to the overall changes (especially concerning the 
`decorateMessage` method).

Why have you changed the `vendor.js` and the `index.js` file? I thought 
that these files should stay unchanged.

I was wondering whether we shouldn't make the `StoppableSourceStreamTask` 
and the `StoppableStreamSource` type safe, e.g. enforcing that the 
`StreamSource` and the `SourceFunction` are actually stoppable. Currently, this 
is not properly checked/enforced. This should be done by the compiler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182884939
  
The WebUI is not shown when I build the PR and start a local cluster. Does 
it work for you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-11 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182917968
  
I've opened a PR against your branch which makes the 
`StoppableSourceStreamTask` and the `StoppableStreamSource` type safe. 
Furthermore, it fixes the problem with the web dashboard files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-10 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-182393013
  
@tillrohrmann just updated this. Let's see if you think it is ready now... 
;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-08 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181402377
  
Ok. Your arguments are very strong. I will update this PR accordingly.

About `FlinkKafkaConsumer08` (and actually all other available sources). 
IMHO, we need a new JIRA to update all sources with an proper `stop()` 
implementation (for which it is suitable). This will include to get a last 
consistent snapshot etc. We should also enable stopping in combination with the 
recently introduces savepoints. But of course, I don't want to extend this PR 
further and add those features as later improvements on top of the STOP feature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-08 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181419327
  
Yes exactly. Let us tackle the next steps (adding stopping functionality to 
more sources, consistent behaviour, drawing savepoint, etc.) once we have a 
first version of the stop signal merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-07 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181040537
  
I don't see much of a difference here.
(1) Of course, we can have a stoppable flag instead of the `JobType`; but I 
don't see any advantage. Why should it be bad to add a job type -- it is 
basically meta information in both cases anyway? And for batch jobs, being 
stoppable does not make much sense anyway.
(2) From my point of view, all `StreamSource`s should be stoppable. I 
cannot think of a source, that is not stoppable -- I am also wondering why 
Kafka sources should not be stoppable properly (on stop, they just do not 
consume any more data from Kafka).
(3) If we keep it as is, there is not reason to infer this. If we apply 
your changes, I agree that it would make sense to detect and set the stoppable 
flag automatically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-07 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-181072411
  
We have managed to far to not make a distinction between batch and 
streaming jobs, which was very good. Given that the future plan is that 
streaming jobs subsume everything, introducing such a change now starts this 
off in a wrong direction, in my opinion.

Defining a flag for what it actually should mean (i.e. is the job 
stoppable) ties the change to the feature, rather than introducing a 
distinction we work hard not to make.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-05 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-180617867
  
Ok let us include as a first version a stop implementation where you don't 
have the strong consistency guarantees. But before I think we should still 
change three things.

The first thing I would like to change is the `JobType`. I feel a bit 
uneasy adding explicit information whether a job is a streaming or a batch job 
to the `JobGraph`. So far we've succeeded in keeping the runtime agnostic to 
the type of job. I would propose to change it to a `stoppable` flag. Thus, 
indicating whether the job is stoppable or not. Semantically, it should not 
make a big difference.

The second thing I would propose is to remove `stop` from the 
`StreamSource` class in order to make it not mandatory for all streaming 
sources to be stoppable. Instead I would also introduce a `StoppableOperator` 
interface which can be implemented by streaming sources which are in fact 
stoppable. I'm not so sure, for example, whether the Kafka sources are properly 
stoppable without throwing exceptions. By introducing this interface we can 
selectively implement stop methods for sources. A nice side effect is that 
existing code won't break if you have a custom source implementation.

And as a last point I would propose to automatically infer whether a job is 
stoppable or not. What we have to do is to check that all source functions 
implement the `StoppableOperator` interface. This should be doable in the 
`StreamGraphGenerator`. Based on this information we can then set the stoppable 
flag in the `JobGraph` or not.

What do you think? Would that be feasible?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-03 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-179261307
  
I think we can get this done today or tomorrow and then finally merge the 
PR. Looking forward having this code in the master branch :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-03 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-179260525
  
Good work @mjsax. I had one last comment. It concerns stopping a job when 
it's in the state `RESTARTING`. I think we have to allow the state transition 
`RESTARTING` to `FINISHED` when the user calls stop on the job (see me code 
comment). 

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-02-02 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-178836268
  
> I had some more comments concerning the failure case handling of stop 
calls. The first problem is still the handling of exceptions when calling stop 
on the Invokable in Task.stopExecution. The exception will only be logged but 
no further action is taken. This can lead to a situation where we have a 
corrupted state. I think, we should fail the task in such a situation.

I see what you mean, but we would not get a corrupted state, do we? 
However, I agree that if `Task.stopExecution` throws an exception,  we should 
report this to the user somehow (not just by logging). Failing the task is fine 
with me. I updated the code for this.

> Additionally, the case that a task cannot be found on the TaskManager and 
that an exception occurs in Task.stopExecution are treated identically by 
sending a TaskOperationResult with success == false to the JobManager. On the 
JobManager side this will only be logged. I think the exception case should be 
handled differently. Failing the execution, for example.

This should not be necessary. There are two exception cases here: (1) The 
task in not stoppable (ie, for a batch job). Failing the execution would of 
course not be appropriate. (2) `Task.stopExecution` throws and exception. This 
case is handled already by the discussion from above.

> And it is still possible that you send a StopJob message to the 
JobManager, see that the job is in state RUNNING, then the ExecutionGraph 
switches to RESTARTING, and then the stop call is executed on the 
ExecutionGraph which won't have an effect. As a user you will receive a 
StoppingSuccess message but the job will simply be restarted. I think we should 
also allow stopping jobs when they are in the state RESTARTING.

Design decision.  I extended allowed states for STOP to CREATED, RUNNING, 
and RESTARTING, which is the easier solution. Not sure if we should include 
CREATED or not though... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-19 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-172791340
  
Are we gonna get this in for the 1.0 release?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-19 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-172805169
  
Great to hear :-) I guess in the next couple weeks but it of course depends 
on the community. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-19 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-172792701
  
That is my plan. Is there already a planed date to release 1.0 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49442110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/Stoppable.java
 ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.runtime.jobgraph.tasks;
+
+/**
+ * Must be implemented by stoppable tasks.
--- End diff --

What are stoppable tasks? Maybe a bit more context would be helpful. E.g. 
it should only be implemented by streaming sources, the method `stop` is called 
when the JM receives a stop message, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49441547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,52 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
--- End diff --

I think this belongs in the JavaDocs of this function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49455037
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+   private AbstractInvokable taskMock;
+   private Task task;
+
+   public void doMocking() throws Exception {
+   TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+   when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+   when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+   when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+   
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+   when(tddMock.getTaskName()).thenReturn("taskName");
+   
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+   
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+   when(tddMock.getInvokableClassName()).thenReturn("className");
+
+   task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+   mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+   mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+   mock(TaskManagerRuntimeInfo.class));
+   Field f = task.getClass().getDeclaredField("invokable");
+   f.setAccessible(true);
+   f.set(task, taskMock);
--- End diff --

The same still applies to `task`. Why do you use a class field for a return 
value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49454567
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
--- End diff --

`extends TestLogger` missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49455315
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/StoppableInvokable.java
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.flink.runtime.testutils;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+
+public final class StoppableInvokable extends AbstractInvokable implements 
Stoppable {
+   private boolean isRunning = true;
+
+   @Override
+   public void registerInputOutput() {}
+
+   @Override
+   public void invoke() throws Exception {
+   while(isRunning) {
+   Thread.sleep(100);
+   }
+   }
+
+   @Override
+   public void stop() {
+   this.isRunning = false;
+   }
+}
--- End diff --

No newline at end of file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49441422
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/StoppingException.java ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime;
+
+public class StoppingException extends Exception {
--- End diff --

JavaDocs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49451808
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StoppingException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v3), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 

[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-170947333
  
Once we've resolved these problems, I'll test the code on the cluster and 
YARN. If they pass, then we should merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49461844
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -417,6 +417,28 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
$executionID)")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
+  sender ! decorateMessage(new 
TaskOperationResult(executionID, true))
+} catch {
+  case t: Throwable =>
+sender ! new TaskOperationResult(executionID, 
false,
--- End diff --

Sending for every `Throwable` a `TaskOperationResult(.., false, ...)` back 
will result in a logging statement on the `JobManager` side that the 
corresponding task could not be found. This seems to be wrong since the task 
was found but an exception occurred while calling stop on it. The `stop` method 
of `Task` can throw an `UnsupportedOperationException` which should lead on the 
`JobManager` side to a logging statement `Task does not support stop`. Every 
other exception should lead to a cancellation of the corresponding `Task`, 
because something obviously went wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49462409
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -742,10 +743,38 @@ private void notifyFatalError(String message, 
Throwable cause) {
}
 
// 

-   //  Canceling / Failing the task from the outside
+   //  Stopping / Canceling / Failing the task from the outside
// 

 
/**
+* Stops the executing task by calling {@link Stoppable#stop()}.
+* 
+* This method never blocks.
+* 
+* 
+* @throws UnsupportedOperationException
+* if the {@link AbstractInvokable} does not implement 
{@link Stoppable}
+*/
+   public void stopExecution() throws UnsupportedOperationException {
+   LOG.info("Attempting to stop task " + taskNameWithSubtask);
+   if(this.invokable instanceof Stoppable) {
+   Runnable runnable = new Runnable() {
+   @Override
+   public void run() {
+   try {
+   
((Stoppable)Task.this.invokable).stop();
+   } catch(RuntimeException e) {
+   LOG.error("Stopping task " + 
taskNameWithSubtask + " failed.", e);
--- End diff --

Shouldn't a `RuntimeException` cause the task to fail? We should notify the 
`JobManager` about this and fail the corresponding task. E.g. we could send a 
`FailTask` message to the `TaskManager` to fail the task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49463485
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,52 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
+
+   final SimpleSlot slot = this.assignedResource;
+
+   if (slot != null) {
+   final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+
+   Future stopResult = gateway.retry(
+   new StopTask(attemptId),
+   NUM_STOP_CALL_TRIES,
+   timeout,
+   executionContext);
+
+   stopResult.onComplete(new OnComplete() {
+   @Override
+   public void onComplete(Throwable failure, 
Object success) throws Throwable {
+   if (failure != null) {
+   fail(new Exception("Task could 
not be stopped.", failure));
+   } else {
+   TaskOperationResult result = 
(TaskOperationResult) success;
+   if (!result.success()) {
--- End diff --

Does this condition really say that the task could not be found? In the 
current implementation a `TaskOperationResult` message with `success == false` 
is also returned if an exception is thrown in `Task.stop`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49463254
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -427,6 +427,28 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job with ID $jobID.")
+
+  currentJobs.get(jobID) match {
+case Some((executionGraph, _)) =>
+  try {
+if (executionGraph.getState() != JobStatus.RUNNING) {
--- End diff --

This guard is not sufficient since job state changes can happen 
asynchronously. Thus, it might happen that the current state is 
`JobStatus.RUNNING` when checking this condition, but before you execute 
`executionGraph.stop()` it will change to `JobStatus.RESTARTING`, for example. 
As a consequence, the sender will receive a `StoppingSuccess` message even 
though the job won't be stopped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-12 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r49463705
  
--- Diff: flink-runtime-web/web-dashboard/web/partials/jobs/job.html ---
@@ -33,6 +33,7 @@
   {{ job['end-time'] | amDateFormat:'-MM-DD, H:mm:ss' 
}}
   {{job.duration | 
humanizeDuration:true}}
   Cancel
+  Stop
--- End diff --

Why can't I stop a Flink job when it is in state `RESTARTING`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-06 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-169267260
  
Just updated this. I did not add an additional YARN test, because I test 
`GET` and `DELETE` in the REST test. About your last comment regarding 
`Execution`: behavior of STOP is the same as for CANCEL. I think we should keep 
it this way for consistency.

Btw: I never tested in a YARN setup...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-06 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-169344534
  
Travis failed with weird compilation error. I guess this is an caching 
issue. It builds locally and on my personal Travis: 
https://travis-ci.org/mjsax/flink/builds/100576933


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-04 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-168673112
  
Just curious what the current state of this PR is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-04 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-168746132
  
Addressing your comments is WIP... Hope to get it done this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2016-01-04 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-168749820
  
Great to hear :-)

On Mon, Jan 4, 2016 at 6:33 PM, Matthias J. Sax 
wrote:

> Addressing your comments is WIP... Hope to get it done this week.
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46670585
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

But I think that it is a good idea to introduce a `StoppingException` 
because it's definitely not a `RuntimeException`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46667227
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

Sorry @mjsax, my bad. I overlooked the try catch block in the `JobManager`. 
Should work perfectly fine :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46670412
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

The problem is that `StoppableInvokable` is not instantiatable. Thus the 
stop signal does not stop anything because the tasks go directly into the state 
Failed. Furthermore, I noticed that we don't need the `NotifyWhenJobRemoved` 
message because we can wait for the `JobResultSuccess` message. I've fixed the 
test and opened a PR against your branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-04 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161943293
  
When I use the stop button of the web interface, then my jobs get 
cancelled. Using the cli, the stop terminates the job gracefully. Thus, there 
seems to be still an issue with the web interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-04 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161939523
  
Have you added the tests for the REST interface? I couldn't find them. For 
the future it might be helpful, if you simply push commits to your PR branch 
without squashing them. It is impossible for me to see what you've actually 
changed with your last commit.

Concerning the REST test case, I assume that you have to start a 
`JobManager` with the `WebRuntimeMonitor` running. Then you submit a job and 
send a stop request to the web interface. I would assume that you can use any 
HTTP client for that. Afterwards you can check the JSON result and verify that 
the job has actually been stopped. That you can also directly do by querying 
the `JobManager`.

It would be important to also add such a test for the yarn use case. There 
you can take a look at the existing yarn test cases to see how to start yarn 
cluster. It's important because Yarn only allows GET requests, if I'm not 
mistaken.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-04 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161949525
  
When fixing the `JobManagerTest` I noticed the following. When the job was 
stopped when it was still in the state `SCHEDULED` or `DEPLOYING`, then one 
received a `StoppingSuccess`. The problem was that the stop was not executed 
and the job later switched to `RUNNING`.

The same can be observed if the job is in state `RESTARTING`. Stopping a 
restarting job does nothing even though you receive a `StoppingSuccess` 
message. The job will later be redeployed. 

As a user I would expect that the job is immediately stopped or at least at 
the next possible moment (e.g. when it's deployed). Or I would expect that the 
system tells me that the stopping is at the moment not possible.

Similar is the question, what happens if only a subset of all sources is 
deployed and in the state `RUNNING`. This would mean that the undeployed 
sources won't get noticed about the stopping signal and, thus, be normally 
deployed. 

Furthermore, what happens if the `stop` method of the `SourceFunction` 
throws an unchecked exception? If I'm not mistaken, then this will only get 
logged. But shouldn't the task be cancelled in such a situation because the 
state cannot be guaranteed to be consistent anymore?

The case that a `Task` is not `Stoppable` and that a `Task` cannot be found 
on the `TaskManager` are treated by the `Execution` identically. Both cases 
cause a `TaskOperationResult(executionID, false, message)` to be sent back to 
the `Execution`. There it will be logged that the stopping call "did not find 
the task". I think it would be good to differentiate the two cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-04 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161982097
  
Thanks. I will take care of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46465735
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

I will just revert the file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-02 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161460588
  
Updated this PR. The test that should be extended is the last open question 
if I did not miss anything else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46399687
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

If this is not strictly needed because it's auto-generated, then this file 
should be excluded imo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46414197
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

@sachingoel0101 can you confirm that this file can be safely removed and 
"git-ignored" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46414370
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

@tillrohrmann any input here? I am still stuck to extend this test? Or 
should I just leave it as is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-02 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46415616
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

It is not an auto-generated file, it is basically the set build utils used 
for the dashboard. Think "Maven build plugins". It is an important part of the 
setup and should not be git-ignored.

Something is strange here, this should not be auto changed when running the 
build tools. The changes to this file should not be committed, they have 
nothing to do with the re-building of the dashboard files.

I am wondering if someone ran a util command that tries to bump all 
dependencies to the latest version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46418046
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

Will take a look asap. Have to finish first another task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46307098
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

I tried to make this work, but without success... I added the following two 
lines:
```
jobManagerGateway.tell(new NotifyWhenJobRemoved(jid), testActorGateway);
expectMsgEquals(true);
```
But the returned message is `JobResultFailure` instead of `true` (I was 
looking at `JobManagerITCase.scala` in which `NotifyWhenJobRemoved` is also 
used and which expects `true` as return value).

As I am not familiar with Akka I need your input here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46305433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

I think it is a bit hard to throw a `RuntimeException` in the case of a 
batch job. This effectively means that whenever you call stop on a batch job, 
the `JobManager` will die with a `RuntimeException`. I think it is better to 
simply tell the user that stop is not supported for batch jobs or to call the 
cancel method for batch jobs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46308707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -732,6 +758,18 @@ else if (current == JobStatus.RESTARTING) {
}
}
 
+   public void stop() {
+   if(jobType == JobType.STREAMING) {
+   for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+   if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+   ev.stop();
+   }
+   }
+   } else {
+   throw new RuntimeException("STOP is only supported by 
streaming jobs.");
--- End diff --

This is not a problem: in `JobManager.scala` a potential `Throwable` is 
caught when `ExecutionGraph.stop()` is called and a prober `StoppingFailure` 
message is sent back as response. I tested this in local mode. The job manager 
did not go down. Or is there any other place in the code in which this could 
become a problem? As alternative, we could introduce a regular exception (ie, 
`StoppingException`) and add a `throws` clause to `stop()`. What do you think 
about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161044122
  
Do I have to build the web resources as well? Because I don't see the stop 
button.

Thanks for your contribution @mjsax. The PR is in a good shape.

I think it would be good to add tests which test the REST stopping 
interface for a standalone cluster and a yarn cluster. Furthermore, the 
JobManager shouldn't fail with a `RuntimeException` when the cli frontend tries 
to stop a batch job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-161045939
  
Actually all changes after running `gulp` in 
`flink-runtime-web/web-dashboard` are committed. There should be no need. I had 
similar problems as the WebUI caches stuff in `/tmp` -- try to clean it there 
(clearing browser cache might also help)

About `RuntimeException` and CLI -- that should work already as you wish. 
But please try it out again to verify. About the REST and yarn tests: I never 
wrote a test like this. Can you give some pointers how to do this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46259453
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

It would be nice if @mjsax could remove the sentence: 
"Specify 'yarn-cluster' as the JobManager to deploy a YARN cluster for the 
job. "
from the description. The "stop" command is not about deploying a job ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290038
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -112,19 +114,24 @@
 
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
-   
+
+   private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
--- End diff --

IntelliJ says that this variable is never used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -227,6 +229,10 @@ public ExecutionState getAggregateState() {

return getAggregateJobVertexState(num, parallelism);
}
+
+   public boolean receivedStopSignal() {
--- End diff --

For what do we need this method and the `receivedStopSignal` field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290465
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

This might end up in a race condition. STOP only sets a flag to the sources 
that they should return from `run()` -- however, the job itself is still in 
state `RUNNING` before this happens, thus it will not be removed from 
`currentJobs` until the sources (actually the whole job) finished. If we check 
too early this test fails. I would rather leave it as is -- otherwise, we 
introduce an instability to this test. Let me know if you disagree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46291411
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+   private AbstractInvokable taskMock;
+   private Task task;
+
+   public void doMocking() throws Exception {
+   TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+   when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+   when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+   when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+   
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+   when(tddMock.getTaskName()).thenReturn("taskName");
+   
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+   
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+   when(tddMock.getInvokableClassName()).thenReturn("className");
+
+   task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+   mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+   mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+   mock(TaskManagerRuntimeInfo.class));
+   Field f = task.getClass().getDeclaredField("invokable");
+   f.setAccessible(true);
+   f.set(task, taskMock);
--- End diff --

private member `Task.invokable` will be `null` until it is created and set 
in `Task.run()` which is never called in this tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46290688
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -420,6 +420,23 @@ class JobManager(
   )
   }
 
+case StopJob(jobID) =>
+  log.info(s"Trying to stop job withID $jobID.")
--- End diff --

whitespace


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46292645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -112,19 +114,24 @@
 
/** The log object used for debugging. */
static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
-   
+
+   private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
--- End diff --

Seems to be a rebasing error (incorrect resolve merge conflict). Will 
remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46293301
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -227,6 +229,10 @@ public ExecutionState getAggregateState() {

return getAggregateJobVertexState(num, parallelism);
}
+
+   public boolean receivedStopSignal() {
--- End diff --

The old WebUI used it (this code got delete when we switched to the new UI) 
-- I guess it is obsolete now. Will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46295358
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class ExecutionJobVertexTest {
+
+   @Test
+   public void testReceivedStopSignal() throws JobException {
--- End diff --

This is obsolete now. I just remove the whole test, because I reverted my 
changes to `ExecutionJobVertex`. Initially, I just added a new test for the 
stuff I added to `ExecutionJobVertex`. Of course it would be nice to test more 
-- I just felt this would be out of scope of this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46294804
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v3), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[2]);
+   PowerMockito
   

[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46284702
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
}
 
/**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
--- End diff --

The thing is that `handleError` was introduced after the `cancel` method 
has been written. Thus at this time, it was not available, yet. So why should 
we keep the deficiencies of an old implementation if we could do it better? Of 
course, it would be nice to update `cancel` accordingly, but this is not part 
of this PR and should happen in a different PR.

IMHO, it is better to ditch antiquated traditions if you're implementing a 
new function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46285044
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import java.util.UUID;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+import static 
org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest {
+
+   private static ActorSystem actorSystem;
+
+   @BeforeClass
+   public static void setup() {
+   pipeSystemOutToNull();
+   actorSystem = ActorSystem.create("TestingActorSystem");
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(actorSystem);
+   actorSystem = null;
+   }
+
+   @BeforeClass
+   public static void init() {
--- End diff --

For me it doesn't make sense to have multiple `@BeforeClass` methods. IMO, 
you should change it here and address `CliFrontendCancelTest` as part of 
another PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46285844
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v3), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[2]);
+   

[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46286009
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

The file was added by @StephanEwen -- it got updated when I ran `gulp` so I 
just committed the changes. To me, it would make sense to remove the file -- 
not sure why it is there... (maybe we should .gitignore it after removal)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46286695
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

:+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46285228
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

Does this mean that the file is generated by you? If this is the case, why 
do we include generated files in the source code repository?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46285468
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,53 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
+
+   final SimpleSlot slot = this.assignedResource;
+
+   if (slot != null) {
+   final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+
+   Future stopResult = gateway.retry(
+   new StopTask(attemptId),
+   NUM_STOP_CALL_TRIES,
+   timeout,
+   executionContext);
+
+   stopResult.onComplete(new OnComplete() {
+   @Override
+   public void onComplete(Throwable failure, 
Object success) throws Throwable {
+   if (failure != null) {
+   fail(new Exception("Task could 
not be stopped.", failure));
+   } else {
+   TaskOperationResult result = 
(TaskOperationResult) success;
+   if (!result.success()) {
+   LOG.debug("Stopping 
task call did not find task. Probably akka message call race.");
--- End diff --

Again sticking to antiquated traditions is imho not a good option if one 
can improve it. Including the change of `cancel's` `onComplete` would be ok for 
me here. But in general, orthogonal changes should be included in a separate PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46287079
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
--- End diff --

I guess you could test that also in a bit broader context. But that's more 
for the future, since you've already written the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46287242
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v3), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[2]);
+   

[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46287141
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
}
 
/**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
--- End diff --

Will change it only here. (Just not a big fan of it, as the other code will 
potentially stay "old" forever. Nobody just rewrites it "just for fun" -- if 
not addressed now as we are aware of the old pattern, we will forget about 
updated the other parts)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46277994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v3), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[2]);
+   

[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46280916
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

Maybe we could check that the job has been actually removed from the 
`currentJobs` in the `JobManager`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46283446
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
--- End diff --

Yes. The call must be "forwarded". The idea is to test if the signal is 
correctly propagated through the whole stack... (same argument as above -- 
rather a test more than less -- but I admit, that I am very [maybe too strict] 
about testing -- to me, EVERYTHING should get tested)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46277659
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobType;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.api.mockito.PowerMockito;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionGraph.class)
+public class ExecutionGraphSignalsTest {
+   private ExecutionJobVertex[] mockEJV = new ExecutionJobVertex[5];
+   private int[] dop = new int[] { 5, 7, 2, 11, 4 };
+   private ExecutionVertex[][] mockEV = new 
ExecutionVertex[mockEJV.length][];
+   private ExecutionGraph eg;
+   private Field f;
+
+   @Before
+   public void prepare() throws Exception {
+   final JobID jobId = new JobID();
+   final String jobName = "Test Job Sample Name";
+   final Configuration cfg = new Configuration();
+
+
+   assert (mockEJV.length == 5);
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   JobVertex v4 = new JobVertex("vertex4");
+   JobVertex v5 = new JobVertex("vertex5");
+
+   for(int i = 0; i < mockEJV.length; ++i) {
+   mockEJV[i] = mock(ExecutionJobVertex.class);
+
+   this.mockEV[i] = new ExecutionVertex[dop[i]];
+   for (int j = 0; j < dop[i]; ++j) {
+   this.mockEV[i][j] = mock(ExecutionVertex.class);
+   }
+
+   when(mockEJV[i].getProducedDataSets()).thenReturn(new 
IntermediateResult[0]);
+   
when(mockEJV[i].getTaskVertices()).thenReturn(this.mockEV[i]);
+   }
+
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v1), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[0]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v2), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[1]);
+   PowerMockito
+   .whenNew(ExecutionJobVertex.class)
+   .withArguments(any(ExecutionGraph.class), same(v3), 
any(Integer.class).intValue(),
+   any(FiniteDuration.class), 
any(Long.class).longValue()).thenReturn(mockEJV[2]);
+   

[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46278142
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class ExecutionJobVertexTest {
+
+   @Test
+   public void testReceivedStopSignal() throws JobException {
--- End diff --

I'm not sure, whether we really need this test. It basically test the 
setter, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46279382
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
--- End diff --

I'm not so sure whether this test adds much value to be honest. It 
basically tests that `Execution.stop()` is called when `ExecutionVertex.stop()` 
is called with:
```
public void stop() {
this.currentExecution.stop();
}
```

is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >